CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Private Member Functions | Private Attributes
edm::StreamSchedule Class Reference

#include <StreamSchedule.h>

Classes

struct  AliasInfo
 

Public Types

typedef std::vector< Worker * > AllWorkers
 
typedef std::vector< WorkerInPathPathWorkers
 
typedef std::vector< PathTrigPaths
 
typedef std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
 
typedef std::shared_ptr< HLTGlobalStatusTrigResPtr
 
typedef std::vector< std::string > vstring
 
typedef std::shared_ptr< WorkerWorkerPtr
 
typedef std::vector< Worker * > Workers
 

Public Member Functions

AllWorkers const & allWorkersBeginEnd () const
 returns the collection of pointers to workers More...
 
AllWorkers const & allWorkersLumisAndEvents () const
 
AllWorkers const & allWorkersRuns () const
 
void availablePaths (std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill the labels for all paths in the process More...
 
void beginStream ()
 
void clearCounters ()
 Clear all the counters in the trigger report. More...
 
StreamContext const & context () const
 
void deleteModule (std::string const &iLabel)
 Delete the module with label iLabel. More...
 
void endStream (ExceptionCollector &collector, std::mutex &collectorMutex) noexcept
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
void getTriggerReport (TriggerReport &rep) const
 
void initializeEarlyDelete (ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, std::multimap< std::string, std::string > const &referencesToBranches, std::vector< std::string > const &modulesToSkip, edm::ProductRegistry const &preg)
 
void moduleDescriptionsInEndPath (std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
 
void moduleDescriptionsInPath (std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
 
void modulesInPath (std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel More...
 
unsigned int numberOfUnscheduledModules () const
 
void processOneEventAsync (WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
 
template<typename T >
void processOneStreamAsync (WaitingTaskHolder iTask, typename T::TransitionInfoType &transitionInfo, ServiceToken const &token, bool cleaningUpAfterException=false)
 
void replaceModule (maker::ModuleHolder *iMod, std::string const &iLabel)
 clone the type of module with label iLabel but configure with iPSet. More...
 
StreamID streamID () const
 
 StreamSchedule (std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, StreamID streamID, ProcessContext const *processContext)
 
 StreamSchedule (StreamSchedule const &)=delete
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
AllWorkers const & unscheduledWorkersLumisAndEvents () const
 

Private Member Functions

ExceptionToActionTable const & actionTable () const
 returns the action table More...
 
void addToAllWorkers (Worker *w)
 
void fillEndPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
 
void fillTrigPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
 
void fillWorkers (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
 
void finishedPaths (std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
 
std::exception_ptr finishProcessOneEvent (std::exception_ptr)
 
void handleException (StreamContext const &, bool cleaningUpAfterException, std::exception_ptr &) const noexcept
 
void makePathStatusInserters (std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
 
template<typename T >
void postScheduleSignal (StreamContext const *, std::exception_ptr &) const noexcept
 
template<typename T >
void preScheduleSignal (StreamContext const *) const
 
void reportSkipped (EventPrincipal const &ep) const
 
void resetAll ()
 
void resetEarlyDelete ()
 
TrigResConstPtr results () const
 
TrigResPtrresults ()
 
std::vector< Worker * > tryToPlaceConditionalModules (Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
 

Private Attributes

std::shared_ptr< ActivityRegistryactReg_
 
std::vector< BranchToCountearlyDeleteBranchToCount_
 
std::vector< EarlyDeleteHelperearlyDeleteHelpers_
 
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
 
std::vector< int > empty_end_paths_
 
std::vector< int > empty_trig_paths_
 
TrigPaths end_paths_
 
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
 
unsigned int number_of_unscheduled_modules_
 
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
 
edm::propagate_const< TrigResPtrresults_
 
edm::propagate_const< WorkerPtrresults_inserter_
 
StreamContext streamContext_
 
StreamID streamID_
 
int total_events_
 
int total_passed_
 
TrigPaths trig_paths_
 
WorkerManager workerManagerBeginEnd_
 
WorkerManager workerManagerLumisAndEvents_
 
WorkerManager workerManagerRuns_
 

Detailed Description

Definition at line 124 of file StreamSchedule.h.

Member Typedef Documentation

◆ AllWorkers

typedef std::vector<Worker*> edm::StreamSchedule::AllWorkers

Definition at line 131 of file StreamSchedule.h.

◆ PathWorkers

Definition at line 135 of file StreamSchedule.h.

◆ TrigPaths

typedef std::vector<Path> edm::StreamSchedule::TrigPaths

Definition at line 127 of file StreamSchedule.h.

◆ TrigResConstPtr

typedef std::shared_ptr<HLTGlobalStatus const> edm::StreamSchedule::TrigResConstPtr

Definition at line 129 of file StreamSchedule.h.

◆ TrigResPtr

Definition at line 128 of file StreamSchedule.h.

◆ vstring

typedef std::vector<std::string> edm::StreamSchedule::vstring

Definition at line 126 of file StreamSchedule.h.

◆ WorkerPtr

typedef std::shared_ptr<Worker> edm::StreamSchedule::WorkerPtr

Definition at line 130 of file StreamSchedule.h.

◆ Workers

typedef std::vector<Worker*> edm::StreamSchedule::Workers

Definition at line 133 of file StreamSchedule.h.

Constructor & Destructor Documentation

◆ StreamSchedule() [1/2]

edm::StreamSchedule::StreamSchedule ( std::shared_ptr< TriggerResultInserter inserter,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
std::shared_ptr< ModuleRegistry modReg,
ParameterSet proc_pset,
service::TriggerNamesService const &  tns,
PreallocationConfiguration const &  prealloc,
ProductRegistry pregistry,
ExceptionToActionTable const &  actions,
std::shared_ptr< ActivityRegistry areg,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
StreamID  streamID,
ProcessContext const *  processContext 
)

Definition at line 371 of file StreamSchedule.cc.

References actions, actReg_, edm::WorkerManager::addToAllWorkers(), addToAllWorkers(), edm::WorkerManager::addToUnscheduledWorkers(), allWorkersLumisAndEvents(), cms::cuda::assert(), end_paths_, fillEndPath(), fillTrigPath(), edm::propagate_const< T >::get(), edm::service::TriggerNamesService::getEndPaths(), edm::ParameterSet::getParameter(), edm::ParameterSet::getPSetForUpdate(), edm::service::TriggerNamesService::getTrigPaths(), label, CrabHelper::log, makePathStatusInserters(), HerwigMaxPtPartonFilter_cfi::moduleLabel, number_of_unscheduled_modules_, results(), results_inserter_, DBoxMetadataHelper::set_difference(), streamID(), AlCaHLTBitMon_QueryRunRegistry::string, trig_paths_, edm::StreamID::value(), workerManagerBeginEnd_, workerManagerLumisAndEvents_, and workerManagerRuns_.

385  : workerManagerBeginEnd_(modReg, areg, actions),
386  workerManagerRuns_(modReg, areg, actions),
387  workerManagerLumisAndEvents_(modReg, areg, actions),
388  actReg_(areg),
389  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
391  trig_paths_(),
392  end_paths_(),
393  total_events_(),
394  total_passed_(),
397  streamContext_(streamID_, processContext) {
398  bool hasPath = false;
399  std::vector<std::string> const& pathNames = tns.getTrigPaths();
400  std::vector<std::string> const& endPathNames = tns.getEndPaths();
401 
402  ConditionalTaskHelper conditionalTaskHelper(
403  proc_pset, preg, &prealloc, processConfiguration, workerManagerLumisAndEvents_, pathNames);
404  std::unordered_set<std::string> conditionalModules;
405 
406  int trig_bitpos = 0;
407  trig_paths_.reserve(pathNames.size());
408  for (auto const& trig_name : pathNames) {
409  fillTrigPath(proc_pset,
410  preg,
411  &prealloc,
412  processConfiguration,
413  trig_bitpos,
414  trig_name,
415  results(),
416  endPathNames,
417  conditionalTaskHelper,
418  conditionalModules);
419  ++trig_bitpos;
420  hasPath = true;
421  }
422 
423  if (hasPath) {
424  // the results inserter stands alone
425  inserter->setTrigResultForStream(streamID.value(), results());
426 
427  results_inserter_ = makeInserter(actions, actReg_, inserter);
429  }
430 
431  // fill normal endpaths
432  int bitpos = 0;
433  end_paths_.reserve(endPathNames.size());
434  for (auto const& end_path_name : endPathNames) {
435  fillEndPath(proc_pset,
436  preg,
437  &prealloc,
438  processConfiguration,
439  bitpos,
440  end_path_name,
441  endPathNames,
442  conditionalTaskHelper,
443  conditionalModules);
444  ++bitpos;
445  }
446 
447  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
448 
449  //See if all modules were used
450  std::set<std::string> usedWorkerLabels;
451  for (auto const& worker : allWorkersLumisAndEvents()) {
452  usedWorkerLabels.insert(worker->description()->moduleLabel());
453  }
454  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
455  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
456  std::vector<std::string> unusedLabels;
457  set_difference(modulesInConfigSet.begin(),
458  modulesInConfigSet.end(),
459  usedWorkerLabels.begin(),
460  usedWorkerLabels.end(),
461  back_inserter(unusedLabels));
462  std::set<std::string> unscheduledLabels;
463  std::vector<std::string> shouldBeUsedLabels;
464  if (!unusedLabels.empty()) {
465  //Need to
466  // 1) create worker
467  // 2) if it is a WorkerT<EDProducer>, add it to our list
468  // 3) hand list to our delayed reader
469  for (auto const& label : unusedLabels) {
470  bool isTracked;
471  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
472  assert(isTracked);
473  assert(modulePSet != nullptr);
475  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
476  }
477  if (!shouldBeUsedLabels.empty()) {
478  std::ostringstream unusedStream;
479  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
480  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
481  itLabelEnd = shouldBeUsedLabels.end();
482  itLabel != itLabelEnd;
483  ++itLabel) {
484  unusedStream << ",'" << *itLabel << "'";
485  }
486  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
487  }
488  }
489  number_of_unscheduled_modules_ = unscheduledLabels.size();
490 
491  // Print conditional modules that were not consumed in any of their associated Paths
492  if (streamID.value() == 0 and not conditionalModules.empty()) {
493  // Intersection of unscheduled and ConditionalTask modules gives
494  // directly the set of conditional modules that were not
495  // consumed by anything in the Paths associated to the
496  // corresponding ConditionalTask.
497  std::vector<std::string_view> labelsToPrint;
498  std::copy_if(
499  unscheduledLabels.begin(),
500  unscheduledLabels.end(),
501  std::back_inserter(labelsToPrint),
502  [&conditionalModules](auto const& lab) { return conditionalModules.find(lab) != conditionalModules.end(); });
503 
504  if (not labelsToPrint.empty()) {
505  edm::LogWarning log("NonConsumedConditionalModules");
506  log << "The following modules were part of some ConditionalTask, but were not\n"
507  << "consumed by any other module in any of the Paths to which the ConditionalTask\n"
508  << "was associated. Perhaps they should be either removed from the\n"
509  << "job, or moved to a Task to make it explicit they are unscheduled.\n";
510  for (auto const& modLabel : labelsToPrint) {
511  log.format("\n {}", modLabel);
512  }
513  }
514  }
515 
516  for (auto const& worker : allWorkersLumisAndEvents()) {
517  std::string const& moduleLabel = worker->description()->moduleLabel();
518 
519  // The new worker pointers will be null for the TriggerResultsInserter, PathStatusInserter, and
520  // EndPathStatusInserter because there are no ParameterSets for those in the configuration.
521  // We could add special code to create workers for those, but instead we skip them because they
522  // do not have beginStream, endStream, or run/lumi begin/end stream transition functions.
523 
524  Worker* workerBeginEnd =
525  getWorker(moduleLabel, proc_pset, workerManagerBeginEnd_, preg, &prealloc, processConfiguration);
526  if (workerBeginEnd) {
527  workerManagerBeginEnd_.addToAllWorkers(workerBeginEnd);
528  }
529 
530  Worker* workerRuns = getWorker(moduleLabel, proc_pset, workerManagerRuns_, preg, &prealloc, processConfiguration);
531  if (workerRuns) {
533  }
534  }
535 
536  } // StreamSchedule::StreamSchedule
roAction_t actions[nactions]
Definition: GenABIO.cc:181
void addToAllWorkers(Worker *w)
edm::propagate_const< WorkerPtr > results_inserter_
assert(be >=bs)
StreamID streamID() const
WorkerManager workerManagerBeginEnd_
unsigned int number_of_unscheduled_modules_
std::shared_ptr< ActivityRegistry > actReg_
constexpr element_type const * get() const
char const * label
WorkerManager workerManagerRuns_
TrigResConstPtr results() const
StreamContext streamContext_
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
Log< level::Info, false > LogInfo
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
edm::propagate_const< TrigResPtr > results_
AllWorkers const & allWorkersLumisAndEvents() const
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
unsigned int value() const
Definition: StreamID.h:43
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
WorkerManager workerManagerLumisAndEvents_
std::vector< std::string > set_difference(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
void addToAllWorkers(Worker *w)

◆ StreamSchedule() [2/2]

edm::StreamSchedule::StreamSchedule ( StreamSchedule const &  )
delete

Member Function Documentation

◆ actionTable()

ExceptionToActionTable const& edm::StreamSchedule::actionTable ( ) const
inlineprivate

returns the action table

Definition at line 245 of file StreamSchedule.h.

References edm::WorkerManager::actionTable(), and workerManagerLumisAndEvents_.

Referenced by fillEndPath(), fillTrigPath(), and finishedPaths().

ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:87
WorkerManager workerManagerLumisAndEvents_

◆ addToAllWorkers()

void edm::StreamSchedule::addToAllWorkers ( Worker w)
private

◆ allWorkersBeginEnd()

AllWorkers const& edm::StreamSchedule::allWorkersBeginEnd ( ) const
inline

returns the collection of pointers to workers

Definition at line 225 of file StreamSchedule.h.

References edm::WorkerManager::allWorkers(), and workerManagerBeginEnd_.

Referenced by replaceModule().

WorkerManager workerManagerBeginEnd_
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:82

◆ allWorkersLumisAndEvents()

AllWorkers const& edm::StreamSchedule::allWorkersLumisAndEvents ( ) const
inline

◆ allWorkersRuns()

AllWorkers const& edm::StreamSchedule::allWorkersRuns ( ) const
inline

Definition at line 226 of file StreamSchedule.h.

References edm::WorkerManager::allWorkers(), and workerManagerRuns_.

Referenced by replaceModule().

226 { return workerManagerRuns_.allWorkers(); }
WorkerManager workerManagerRuns_
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:82

◆ availablePaths()

void edm::StreamSchedule::availablePaths ( std::vector< std::string > &  oLabelsToFill) const

adds to oLabelsToFill the labels for all paths in the process

Definition at line 1312 of file StreamSchedule.cc.

References edm::Path::name(), HcalDetIdTransform::transform(), and trig_paths_.

1312  {
1313  oLabelsToFill.reserve(trig_paths_.size());
1314  std::transform(trig_paths_.begin(),
1315  trig_paths_.end(),
1316  std::back_inserter(oLabelsToFill),
1317  std::bind(&Path::name, std::placeholders::_1));
1318  }
std::string const & name() const
Definition: Path.h:66
unsigned transform(const HcalDetId &id, unsigned transformCode)

◆ beginStream()

void edm::StreamSchedule::beginStream ( )

Definition at line 1007 of file StreamSchedule.cc.

References edm::WorkerManager::beginStream(), CMS_SA_ALLOW, handleException(), edm::LuminosityBlockIndex::invalidLuminosityBlockIndex(), edm::RunIndex::invalidRunIndex(), edm::StreamContext::kBeginStream, edm::StreamContext::kInvalid, edm::StreamContext::setEventID(), edm::StreamContext::setLuminosityBlockIndex(), edm::StreamContext::setRunIndex(), edm::StreamContext::setTimestamp(), edm::StreamContext::setTransition(), streamContext_, streamID_, and workerManagerBeginEnd_.

1007  {
1009  streamContext_.setEventID(EventID(0, 0, 0));
1013 
1014  std::exception_ptr exceptionInStream;
1015  CMS_SA_ALLOW try {
1016  preScheduleSignal<BeginStreamTraits>(&streamContext_);
1018  } catch (...) {
1019  exceptionInStream = std::current_exception();
1020  }
1021 
1022  postScheduleSignal<BeginStreamTraits>(&streamContext_, exceptionInStream);
1023 
1024  if (exceptionInStream) {
1025  bool cleaningUpAfterException = false;
1026  handleException(streamContext_, cleaningUpAfterException, exceptionInStream);
1027  }
1029 
1030  if (exceptionInStream) {
1031  std::rethrow_exception(exceptionInStream);
1032  }
1033  }
void setTimestamp(Timestamp const &v)
Definition: StreamContext.h:70
#define CMS_SA_ALLOW
WorkerManager workerManagerBeginEnd_
void setTransition(Transition v)
Definition: StreamContext.h:66
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
void setLuminosityBlockIndex(LuminosityBlockIndex const &v)
Definition: StreamContext.h:69
StreamContext streamContext_
static LuminosityBlockIndex invalidLuminosityBlockIndex()
void handleException(StreamContext const &, bool cleaningUpAfterException, std::exception_ptr &) const noexcept
void setEventID(EventID const &v)
Definition: StreamContext.h:67
void setRunIndex(RunIndex const &v)
Definition: StreamContext.h:68
void beginStream(StreamID, StreamContext const &)

◆ clearCounters()

void edm::StreamSchedule::clearCounters ( )

Clear all the counters in the trigger report.

Definition at line 1444 of file StreamSchedule.cc.

References allWorkersLumisAndEvents(), edm::Path::clearCounters(), edm::Worker::clearCounters(), end_paths_, edm::for_all(), total_events_, total_passed_, and trig_paths_.

1444  {
1445  using std::placeholders::_1;
1447  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
1448  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
1450  }
void clearCounters() noexcept
Definition: Worker.h:234
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
AllWorkers const & allWorkersLumisAndEvents() const
void clearCounters()
Definition: Path.cc:183

◆ context()

StreamContext const& edm::StreamSchedule::context ( ) const
inline

Definition at line 234 of file StreamSchedule.h.

References streamContext_.

234 { return streamContext_; }
StreamContext streamContext_

◆ deleteModule()

void edm::StreamSchedule::deleteModule ( std::string const &  iLabel)

Delete the module with label iLabel.

Definition at line 1096 of file StreamSchedule.cc.

References edm::WorkerManager::deleteModuleIfExists(), workerManagerBeginEnd_, workerManagerLumisAndEvents_, and workerManagerRuns_.

1096  {
1100  }
void deleteModuleIfExists(std::string const &moduleLabel)
WorkerManager workerManagerBeginEnd_
WorkerManager workerManagerRuns_
WorkerManager workerManagerLumisAndEvents_

◆ endStream()

void edm::StreamSchedule::endStream ( ExceptionCollector collector,
std::mutex collectorMutex 
)
noexcept

Definition at line 1035 of file StreamSchedule.cc.

References CMS_SA_ALLOW, edm::WorkerManager::endStream(), edm::LuminosityBlockIndex::invalidLuminosityBlockIndex(), edm::RunIndex::invalidRunIndex(), edm::StreamContext::kEndStream, edm::StreamContext::kInvalid, edm::StreamContext::setEventID(), edm::StreamContext::setLuminosityBlockIndex(), edm::StreamContext::setRunIndex(), edm::StreamContext::setTimestamp(), edm::StreamContext::setTransition(), streamContext_, streamID_, and workerManagerBeginEnd_.

1035  {
1037  streamContext_.setEventID(EventID(0, 0, 0));
1041 
1042  std::exception_ptr exceptionInStream;
1043  CMS_SA_ALLOW try {
1044  preScheduleSignal<EndStreamTraits>(&streamContext_);
1045  workerManagerBeginEnd_.endStream(streamID_, streamContext_, collector, collectorMutex);
1046  } catch (...) {
1047  exceptionInStream = std::current_exception();
1048  }
1049 
1050  postScheduleSignal<EndStreamTraits>(&streamContext_, exceptionInStream);
1051 
1052  if (exceptionInStream) {
1053  std::lock_guard<std::mutex> collectorLock(collectorMutex);
1054  collector.call([&exceptionInStream]() { std::rethrow_exception(exceptionInStream); });
1055  }
1057  }
void setTimestamp(Timestamp const &v)
Definition: StreamContext.h:70
#define CMS_SA_ALLOW
WorkerManager workerManagerBeginEnd_
void setTransition(Transition v)
Definition: StreamContext.h:66
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
void setLuminosityBlockIndex(LuminosityBlockIndex const &v)
Definition: StreamContext.h:69
StreamContext streamContext_
static LuminosityBlockIndex invalidLuminosityBlockIndex()
void setEventID(EventID const &v)
Definition: StreamContext.h:67
void endStream(StreamID, StreamContext const &, ExceptionCollector &, std::mutex &collectorMutex) noexcept
void setRunIndex(RunIndex const &v)
Definition: StreamContext.h:68

◆ fillEndPath()

void edm::StreamSchedule::fillEndPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
std::vector< std::string > const &  endPathNames,
ConditionalTaskHelper const &  conditionalTaskHelper,
std::unordered_set< std::string > &  allConditionalModules 
)
private

Definition at line 969 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_end_paths_, end_paths_, fillWorkers(), edm::PathContext::kEndPath, mergeVDriftHistosByStation::name, and streamContext_.

Referenced by StreamSchedule().

977  {
978  PathWorkers tmpworkers;
979  fillWorkers(proc_pset,
980  preg,
981  prealloc,
982  processConfiguration,
983  name,
984  true,
985  tmpworkers,
986  endPathNames,
987  conditionalTaskHelper,
988  allConditionalModules);
989 
990  if (!tmpworkers.empty()) {
991  end_paths_.emplace_back(bitpos,
992  name,
993  tmpworkers,
994  TrigResPtr(),
995  actionTable(),
996  actReg_,
999  } else {
1000  empty_end_paths_.push_back(bitpos);
1001  }
1002  for (WorkerInPath const& workerInPath : tmpworkers) {
1003  addToAllWorkers(workerInPath.getWorker());
1004  }
1005  }
ExceptionToActionTable const & actionTable() const
returns the action table
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
std::shared_ptr< HLTGlobalStatus > TrigResPtr
void addToAllWorkers(Worker *w)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
StreamContext streamContext_

◆ fillTrigPath()

void edm::StreamSchedule::fillTrigPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
TrigResPtr  trptr,
std::vector< std::string > const &  endPathNames,
ConditionalTaskHelper const &  conditionalTaskHelper,
std::unordered_set< std::string > &  allConditionalModules 
)
private

Definition at line 935 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_trig_paths_, fillWorkers(), edm::PathContext::kPath, mergeVDriftHistosByStation::name, streamContext_, and trig_paths_.

Referenced by StreamSchedule().

944  {
945  PathWorkers tmpworkers;
946  fillWorkers(proc_pset,
947  preg,
948  prealloc,
949  processConfiguration,
950  name,
951  false,
952  tmpworkers,
953  endPathNames,
954  conditionalTaskHelper,
955  allConditionalModules);
956 
957  // an empty path will cause an extra bit that is not used
958  if (!tmpworkers.empty()) {
959  trig_paths_.emplace_back(
960  bitpos, name, tmpworkers, trptr, actionTable(), actReg_, &streamContext_, PathContext::PathType::kPath);
961  } else {
962  empty_trig_paths_.push_back(bitpos);
963  }
964  for (WorkerInPath const& workerInPath : tmpworkers) {
965  addToAllWorkers(workerInPath.getWorker());
966  }
967  }
ExceptionToActionTable const & actionTable() const
returns the action table
std::vector< int > empty_trig_paths_
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
void addToAllWorkers(Worker *w)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
StreamContext streamContext_

◆ fillWorkers()

void edm::StreamSchedule::fillWorkers ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
std::string const &  name,
bool  ignoreFilters,
PathWorkers out,
std::vector< std::string > const &  endPathNames,
ConditionalTaskHelper const &  conditionalTaskHelper,
std::unordered_set< std::string > &  allConditionalModules 
)
private

Definition at line 827 of file StreamSchedule.cc.

References edm::ConditionalTaskHelper::aliasMap(), edm::ConditionalTaskHelper::conditionalModuleBranches(), edm::errors::Configuration, edm::Worker::description(), Exception, edm::ParameterSet::getParameter(), edm::ParameterSet::getUntrackedParameter(), edm::WorkerInPath::Ignore, ALPAKA_ACCELERATOR_NAMESPACE::vertexFinder::it, edm::Worker::kFilter, HerwigMaxPtPartonFilter_cfi::moduleLabel, edm::ModuleDescription::moduleName(), edm::Worker::moduleType(), mergeVDriftHistosByStation::name, edm::WorkerInPath::Normal, or, MillePedeFileConverter_cfg::out, hltMonBTagIPClient_cfi::pathName, edm::search_all(), AlCaHLTBitMon_QueryRunRegistry::string, tryToPlaceConditionalModules(), edm::WorkerInPath::Veto, and workerManagerLumisAndEvents_.

Referenced by fillEndPath(), and fillTrigPath().

836  {
837  vstring modnames = proc_pset.getParameter<vstring>(pathName);
838  PathWorkers tmpworkers;
839 
840  //Pull out ConditionalTask modules
841  auto condRange = findConditionalTaskModulesRange(modnames);
842 
843  std::unordered_set<std::string> conditionalmods;
844  //An EDAlias may be redirecting to a module on a ConditionalTask
845  std::unordered_multimap<std::string, edm::BranchDescription const*> conditionalModsBranches;
846  std::unordered_map<std::string, unsigned int> conditionalModOrder;
847  if (condRange.first != condRange.second) {
848  for (auto it = condRange.first; it != condRange.second; ++it) {
849  // ordering needs to skip the # token in the path list
850  conditionalModOrder.emplace(*it, it - modnames.begin() - 1);
851  }
852  //the last entry should be ignored since it is required to be "@"
853  conditionalmods = std::unordered_set<std::string>(std::make_move_iterator(condRange.first),
854  std::make_move_iterator(condRange.second));
855 
856  conditionalModsBranches = conditionalTaskHelper.conditionalModuleBranches(conditionalmods);
857  modnames.erase(std::prev(condRange.first), modnames.end());
858 
859  // Make a union of all conditional modules from all Paths
860  allConditionalModules.insert(conditionalmods.begin(), conditionalmods.end());
861  }
862 
863  unsigned int placeInPath = 0;
864  for (auto const& name : modnames) {
865  //Modules except EDFilters are set to run concurrently by default
866  bool doNotRunConcurrently = false;
868  if (name[0] == '!') {
869  filterAction = WorkerInPath::Veto;
870  } else if (name[0] == '-' or name[0] == '+') {
871  filterAction = WorkerInPath::Ignore;
872  }
873  if (name[0] == '|' or name[0] == '+') {
874  //cms.wait was specified so do not run concurrently
875  doNotRunConcurrently = true;
876  }
877 
879  if (filterAction != WorkerInPath::Normal or name[0] == '|') {
880  moduleLabel.erase(0, 1);
881  }
882 
883  Worker* worker =
884  getWorker(moduleLabel, proc_pset, workerManagerLumisAndEvents_, preg, prealloc, processConfiguration);
885  if (worker == nullptr) {
886  std::string pathType("endpath");
887  if (!search_all(endPathNames, pathName)) {
888  pathType = std::string("path");
889  }
891  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
892  << "\"\n please check spelling or remove that label from the path.";
893  }
894 
895  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
896  // We have a filter on an end path, and the filter is not explicitly ignored.
897  // See if the filter is allowed.
898  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
899  if (!search_all(allowed_filters, worker->description()->moduleName())) {
900  // Filter is not allowed. Ignore the result, and issue a warning.
901  filterAction = WorkerInPath::Ignore;
902  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
903  << "' with module label '" << moduleLabel << "' appears on EndPath '"
904  << pathName << "'.\n"
905  << "The return value of the filter will be ignored.\n"
906  << "To suppress this warning, either remove the filter from the endpath,\n"
907  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
908  }
909  }
910  bool runConcurrently = not doNotRunConcurrently;
911  if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
912  runConcurrently = false;
913  }
914 
915  auto condModules = tryToPlaceConditionalModules(worker,
916  conditionalmods,
917  conditionalModsBranches,
918  conditionalTaskHelper.aliasMap(),
919  proc_pset,
920  preg,
921  prealloc,
922  processConfiguration);
923  for (auto condMod : condModules) {
924  tmpworkers.emplace_back(
925  condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true);
926  }
927 
928  tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
929  ++placeInPath;
930  }
931 
932  out.swap(tmpworkers);
933  }
vector< string > vstring
Definition: ExoticaDQM.cc:7
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
std::vector< WorkerInPath > PathWorkers
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
Log< level::Warning, false > LogWarning
WorkerManager workerManagerLumisAndEvents_

◆ finishedPaths()

void edm::StreamSchedule::finishedPaths ( std::atomic< std::exception_ptr *> &  iExcept,
WaitingTaskHolder  iWait,
EventTransitionInfo info 
)
private

Definition at line 1223 of file StreamSchedule.cc.

References writedatasetfile::action, actionTable(), cms::Exception::addContext(), cms::cuda::assert(), CMS_SA_ALLOW, cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), MillePedeFileConverter_cfg::e, edm::ExceptionToActionTable::find(), edm::propagate_const< T >::get(), edm::exception_actions::IgnoreCompletely, info(), edm::printCmsExceptionWarning(), results_, results_inserter_, streamContext_, streamID_, total_passed_, and edm::exception_actions::TryToContinue.

Referenced by processOneEventAsync().

1225  {
1226  if (iExcept) {
1227  // Caught exception is propagated via WaitingTaskHolder
1228  CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
1232  edm::printCmsExceptionWarning("TryToContinue", e);
1233  *(iExcept.load()) = std::exception_ptr();
1234  } else {
1235  *(iExcept.load()) = std::current_exception();
1236  }
1237  } catch (...) {
1238  *(iExcept.load()) = std::current_exception();
1239  }
1240  }
1241 
1242  if ((not iExcept) and results_->accept()) {
1243  ++total_passed_;
1244  }
1245 
1246  if (nullptr != results_inserter_.get()) {
1247  // Caught exception is propagated to the caller
1248  CMS_SA_ALLOW try {
1249  //Even if there was an exception, we need to allow results inserter
1250  // to run since some module may be waiting on its results.
1251  ParentContext parentContext(&streamContext_);
1252  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1253 
1254  auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
1255  if (expt) {
1256  std::rethrow_exception(expt);
1257  }
1258  } catch (cms::Exception& ex) {
1259  if (not iExcept) {
1260  if (ex.context().empty()) {
1261  std::ostringstream ost;
1262  ost << "Processing Event " << info.principal().id();
1263  ex.addContext(ost.str());
1264  }
1265  iExcept.store(new std::exception_ptr(std::current_exception()));
1266  }
1267  } catch (...) {
1268  if (not iExcept) {
1269  iExcept.store(new std::exception_ptr(std::current_exception()));
1270  }
1271  }
1272  }
1273  std::exception_ptr ptr;
1274  if (iExcept) {
1275  ptr = *iExcept.load();
1276  }
1277  iWait.doneWaiting(ptr);
1278  }
ExceptionToActionTable const & actionTable() const
returns the action table
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
edm::propagate_const< WorkerPtr > results_inserter_
assert(be >=bs)
constexpr element_type const * get() const
StreamContext streamContext_
exception_actions::ActionCodes find(const std::string &category) const
edm::propagate_const< TrigResPtr > results_
void addContext(std::string const &context)
Definition: Exception.cc:169
std::list< std::string > const & context() const
Definition: Exception.cc:151
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)

◆ finishProcessOneEvent()

std::exception_ptr edm::StreamSchedule::finishProcessOneEvent ( std::exception_ptr  iExcept)
private

Definition at line 1280 of file StreamSchedule.cc.

References actReg_, edm::addContextAndPrintException(), CMS_SA_ALLOW, cms::Exception::context(), edm::ExceptionFromThisContext, resetEarlyDelete(), streamContext_, and edm::convertException::wrap().

Referenced by processOneEventAsync().

1280  {
1281  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1282 
1283  if (iExcept) {
1284  //add context information to the exception and print message
1285  try {
1286  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
1287  } catch (cms::Exception& ex) {
1288  bool const cleaningUpAfterException = false;
1289  if (ex.context().empty()) {
1290  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
1291  } else {
1292  addContextAndPrintException("", ex, cleaningUpAfterException);
1293  }
1294  iExcept = std::current_exception();
1295  }
1296 
1297  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
1298  }
1299  // Caught exception is propagated to the caller
1300  CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
1301  if (not iExcept) {
1302  iExcept = std::current_exception();
1303  }
1304  }
1305  if (not iExcept) {
1306  resetEarlyDelete();
1307  }
1308 
1309  return iExcept;
1310  }
#define CMS_SA_ALLOW
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::shared_ptr< ActivityRegistry > actReg_
StreamContext streamContext_
auto wrap(F iFunc) -> decltype(iFunc())
std::list< std::string > const & context() const
Definition: Exception.cc:151

◆ getAllModuleDescriptions()

std::vector< ModuleDescription const * > edm::StreamSchedule::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this StreamSchedule. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 1102 of file StreamSchedule.cc.

References allWorkersLumisAndEvents(), AlCaHLTBitMon_ParallelJobs::p, and mps_fire::result.

1102  {
1103  std::vector<ModuleDescription const*> result;
1104  result.reserve(allWorkersLumisAndEvents().size());
1105 
1106  for (auto const& worker : allWorkersLumisAndEvents()) {
1107  ModuleDescription const* p = worker->description();
1108  result.push_back(p);
1109  }
1110  return result;
1111  }
size
Write out results.
AllWorkers const & allWorkersLumisAndEvents() const

◆ getTriggerReport()

void edm::StreamSchedule::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 1434 of file StreamSchedule.cc.

References allWorkersLumisAndEvents(), end_paths_, edm::fillPathSummary(), edm::fillWorkerSummary(), cuy::rep, totalEvents(), totalEventsFailed(), totalEventsPassed(), and trig_paths_.

1434  {
1435  rep.eventSummary.totalEvents += totalEvents();
1436  rep.eventSummary.totalEventsPassed += totalEventsPassed();
1437  rep.eventSummary.totalEventsFailed += totalEventsFailed();
1438 
1439  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
1440  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
1441  fill_summary(allWorkersLumisAndEvents(), rep.workerSummaries, &fillWorkerSummary);
1442  }
int totalEventsPassed() const
int totalEventsFailed() const
static void fillPathSummary(Path const &path, PathSummary &sum)
rep
Definition: cuy.py:1189
int totalEvents() const
AllWorkers const & allWorkersLumisAndEvents() const
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)

◆ handleException()

void edm::StreamSchedule::handleException ( StreamContext const &  streamContext,
bool  cleaningUpAfterException,
std::exception_ptr &  excpt 
) const
privatenoexcept

Definition at line 1521 of file StreamSchedule.cc.

References actReg_, edm::addContextAndPrintException(), CMS_SA_ALLOW, cms::Exception::context(), edm::exceptionContext(), edm::ExceptionFromThisContext, and edm::convertException::wrap().

Referenced by beginStream(), and processOneStreamAsync().

1523  {
1524  //add context information to the exception and print message
1525  try {
1526  convertException::wrap([&excpt]() { std::rethrow_exception(excpt); });
1527  } catch (cms::Exception& ex) {
1528  std::ostringstream ost;
1529  // In most cases the exception will already have context at this point,
1530  // but add some context here in those rare cases where it does not.
1531  if (ex.context().empty()) {
1532  exceptionContext(ost, streamContext);
1533  }
1534  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
1535  excpt = std::current_exception();
1536  }
1537  // We are already handling an earlier exception, so ignore it
1538  // if this signal results in another exception being thrown.
1539  CMS_SA_ALLOW try {
1540  actReg_->preStreamEarlyTerminationSignal_(streamContext, TerminationOrigin::ExceptionFromThisContext);
1541  } catch (...) {
1542  }
1543  }
#define CMS_SA_ALLOW
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::shared_ptr< ActivityRegistry > actReg_
auto wrap(F iFunc) -> decltype(iFunc())
std::list< std::string > const & context() const
Definition: Exception.cc:151

◆ initializeEarlyDelete()

void edm::StreamSchedule::initializeEarlyDelete ( ModuleRegistry modReg,
std::vector< std::string > const &  branchesToDeleteEarly,
std::multimap< std::string, std::string > const &  referencesToBranches,
std::vector< std::string > const &  modulesToSkip,
edm::ProductRegistry const &  preg 
)

Definition at line 538 of file StreamSchedule.cc.

References allWorkersLumisAndEvents(), cms::cuda::assert(), MicroEventContent_cff::branch, edm::maker::ModuleHolder::createOutputModuleCommunicator(), dumpMFGeometry_cfg::delta, submitPVResolutionJobs::desc, earlyDeleteBranchToCount_, earlyDeleteHelpers_, earlyDeleteHelperToBranchIndicies_, end_paths_, edm::ModuleRegistry::forAllModuleHolders(), dqm-mbProfile::format, newFWLiteAna::found, edm::InEvent, ALPAKA_ACCELERATOR_NAMESPACE::vertexFinder::it, B2GTnPMonitor_cfi::item, ALPAKA_ACCELERATOR_NAMESPACE::pixelClustering::pixelStatus::kEmpty, MainPageGenerator::l, create_idmaps::n, AlCaHLTBitMon_ParallelJobs::p, resetEarlyDelete(), AlCaHLTBitMon_QueryRunRegistry::string, groupFilesInBlocks::temp, trig_paths_, mitigatedMETSequence_cff::U, and w().

542  {
543  // setup the list with those products actually registered for this job
544  std::multimap<std::string, Worker*> branchToReadingWorker;
545  initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
546 
547  const std::vector<std::string> kEmpty;
548  std::map<Worker*, unsigned int> reserveSizeForWorker;
549  unsigned int upperLimitOnReadingWorker = 0;
550  unsigned int upperLimitOnIndicies = 0;
551  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
552 
553  //talk with output modules first
554  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
555  auto comm = iHolder->createOutputModuleCommunicator();
556  if (comm) {
557  if (!branchToReadingWorker.empty()) {
558  //If an OutputModule needs a product, we can't delete it early
559  // so we should remove it from our list
560  SelectedProductsForBranchType const& kept = comm->keptProducts();
561  for (auto const& item : kept[InEvent]) {
562  BranchDescription const& desc = *item.first;
563  auto found = branchToReadingWorker.equal_range(desc.branchName());
564  if (found.first != found.second) {
565  --nUniqueBranchesToDelete;
566  branchToReadingWorker.erase(found.first, found.second);
567  }
568  }
569  }
570  }
571  });
572 
573  if (branchToReadingWorker.empty()) {
574  return;
575  }
576 
577  std::unordered_set<std::string> modulesToExclude(modulesToSkip.begin(), modulesToSkip.end());
578  for (auto w : allWorkersLumisAndEvents()) {
579  if (modulesToExclude.end() != modulesToExclude.find(w->description()->moduleLabel())) {
580  continue;
581  }
582  //determine if this module could read a branch we want to delete early
583  auto consumes = w->consumesInfo();
584  if (not consumes.empty()) {
585  bool foundAtLeastOneMatchingBranch = false;
586  for (auto const& product : consumes) {
587  std::string branch = fmt::format("{}_{}_{}_{}",
588  product.type().friendlyClassName(),
589  product.label().data(),
590  product.instance().data(),
591  product.process().data());
592  {
593  //Handle case where worker directly consumes product
594  auto found = branchToReadingWorker.end();
595  if (product.process().empty()) {
596  auto startFound = branchToReadingWorker.lower_bound(branch);
597  if (startFound != branchToReadingWorker.end()) {
598  if (startFound->first.substr(0, branch.size()) == branch) {
599  //match all processNames here, even if it means multiple matches will happen
600  found = startFound;
601  }
602  }
603  } else {
604  auto exactFound = branchToReadingWorker.equal_range(branch);
605  if (exactFound.first != exactFound.second) {
606  found = exactFound.first;
607  }
608  }
609  if (found != branchToReadingWorker.end()) {
610  if (not foundAtLeastOneMatchingBranch) {
611  ++upperLimitOnReadingWorker;
612  foundAtLeastOneMatchingBranch = true;
613  }
614  ++upperLimitOnIndicies;
615  ++reserveSizeForWorker[w];
616  if (nullptr == found->second) {
617  found->second = w;
618  } else {
619  branchToReadingWorker.insert(make_pair(found->first, w));
620  }
621  }
622  }
623  {
624  //Handle case where indirectly consumes product
625  auto found = referencesToBranches.end();
626  if (product.process().empty()) {
627  auto startFound = referencesToBranches.lower_bound(branch);
628  if (startFound != referencesToBranches.end()) {
629  if (startFound->first.substr(0, branch.size()) == branch) {
630  //match all processNames here, even if it means multiple matches will happen
631  found = startFound;
632  }
633  }
634  } else {
635  //can match exactly
636  auto exactFound = referencesToBranches.equal_range(branch);
637  if (exactFound.first != exactFound.second) {
638  found = exactFound.first;
639  }
640  }
641  if (found != referencesToBranches.end()) {
642  for (auto itr = found; (itr != referencesToBranches.end()) and (itr->first == found->first); ++itr) {
643  auto foundInBranchToReadingWorker = branchToReadingWorker.find(itr->second);
644  if (foundInBranchToReadingWorker == branchToReadingWorker.end()) {
645  continue;
646  }
647  if (not foundAtLeastOneMatchingBranch) {
648  ++upperLimitOnReadingWorker;
649  foundAtLeastOneMatchingBranch = true;
650  }
651  ++upperLimitOnIndicies;
652  ++reserveSizeForWorker[w];
653  if (nullptr == foundInBranchToReadingWorker->second) {
654  foundInBranchToReadingWorker->second = w;
655  } else {
656  branchToReadingWorker.insert(make_pair(itr->second, w));
657  }
658  }
659  }
660  }
661  }
662  }
663  }
664  {
665  auto it = branchToReadingWorker.begin();
666  std::vector<std::string> unusedBranches;
667  while (it != branchToReadingWorker.end()) {
668  if (it->second == nullptr) {
669  unusedBranches.push_back(it->first);
670  //erasing the object invalidates the iterator so must advance it first
671  auto temp = it;
672  ++it;
673  branchToReadingWorker.erase(temp);
674  } else {
675  ++it;
676  }
677  }
678  if (not unusedBranches.empty()) {
679  LogWarning l("UnusedProductsForCanDeleteEarly");
680  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
681  " If possible, remove the producer from the job.";
682  for (auto const& n : unusedBranches) {
683  l << "\n " << n;
684  }
685  }
686  }
687  if (!branchToReadingWorker.empty()) {
688  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
689  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
690  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
691  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
692  std::string lastBranchName;
693  size_t nextOpenIndex = 0;
694  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
695  for (auto& branchAndWorker : branchToReadingWorker) {
696  if (lastBranchName != branchAndWorker.first) {
697  //have to put back the period we removed earlier in order to get the proper name
698  BranchID bid(branchAndWorker.first + ".");
699  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
700  lastBranchName = branchAndWorker.first;
701  }
702  auto found = alreadySeenWorkers.find(branchAndWorker.second);
703  if (alreadySeenWorkers.end() == found) {
704  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
705  // all the branches that might be read by this worker. However, initially we will only tell the
706  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
707  // EarlyDeleteHelper will automatically advance its internal end pointer.
708  size_t index = nextOpenIndex;
709  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
712  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
713  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
714  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
715  nextOpenIndex += nIndices;
716  } else {
717  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
718  }
719  }
720 
721  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
722  // space needed for each module
723  auto itLast = earlyDeleteHelpers_.begin();
724  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
725  if (itLast->end() != it->begin()) {
726  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
727  unsigned int delta = it->begin() - itLast->end();
728  it->shiftIndexPointers(delta);
729 
731  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
732  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
733  }
734  itLast = it;
735  }
737  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
739 
740  //now tell the paths about the deleters
741  for (auto& p : trig_paths_) {
742  p.setEarlyDeleteHelpers(alreadySeenWorkers);
743  }
744  for (auto& p : end_paths_) {
745  p.setEarlyDeleteHelpers(alreadySeenWorkers);
746  }
748  }
749  }
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
T w() const
std::vector< BranchToCount > earlyDeleteBranchToCount_
assert(be >=bs)
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
AllWorkers const & allWorkersLumisAndEvents() const
Log< level::Warning, false > LogWarning
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_

◆ makePathStatusInserters()

void edm::StreamSchedule::makePathStatusInserters ( std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
ExceptionToActionTable const &  actions 
)
private

Definition at line 1470 of file StreamSchedule.cc.

References actions, actReg_, addToAllWorkers(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, edm::get_underlying(), pathStatusInserterWorkers_, and trig_paths_.

Referenced by StreamSchedule().

1473  {
1474  int bitpos = 0;
1475  unsigned int indexEmpty = 0;
1476  unsigned int indexOfPath = 0;
1477  for (auto& pathStatusInserter : pathStatusInserters) {
1478  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
1479  WorkerPtr workerPtr(
1480  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1481  pathStatusInserterWorkers_.emplace_back(workerPtr);
1482  workerPtr->setActivityRegistry(actReg_);
1483  addToAllWorkers(workerPtr.get());
1484 
1485  // A little complexity here because a C++ Path object is not
1486  // instantiated and put into end_paths if there are no modules
1487  // on the configured path.
1488  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
1489  ++indexEmpty;
1490  } else {
1491  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
1492  ++indexOfPath;
1493  }
1494  ++bitpos;
1495  }
1496 
1497  bitpos = 0;
1498  indexEmpty = 0;
1499  indexOfPath = 0;
1500  for (auto& endPathStatusInserter : endPathStatusInserters) {
1501  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
1502  WorkerPtr workerPtr(
1503  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
1504  endPathStatusInserterWorkers_.emplace_back(workerPtr);
1505  workerPtr->setActivityRegistry(actReg_);
1506  addToAllWorkers(workerPtr.get());
1507 
1508  // A little complexity here because a C++ Path object is not
1509  // instantiated and put into end_paths if there are no modules
1510  // on the configured path.
1511  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
1512  ++indexEmpty;
1513  } else {
1514  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
1515  ++indexOfPath;
1516  }
1517  ++bitpos;
1518  }
1519  }
std::vector< int > empty_trig_paths_
roAction_t actions[nactions]
Definition: GenABIO.cc:181
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void addToAllWorkers(Worker *w)
std::shared_ptr< Worker > WorkerPtr
std::shared_ptr< ActivityRegistry > actReg_
std::vector< int > empty_end_paths_
constexpr T & get_underlying(propagate_const< T > &)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_

◆ moduleDescriptionsInEndPath()

void edm::StreamSchedule::moduleDescriptionsInEndPath ( std::string const &  iEndPathLabel,
std::vector< ModuleDescription const *> &  descriptions,
unsigned int  hint 
) const

Definition at line 1362 of file StreamSchedule.cc.

References end_paths_, newFWLiteAna::found, mps_fire::i, and edm::Path::name().

1364  {
1365  descriptions.clear();
1366  bool found = false;
1367  TrigPaths::const_iterator itFound;
1368 
1369  if (hint < end_paths_.size()) {
1370  itFound = end_paths_.begin() + hint;
1371  if (itFound->name() == iEndPathLabel)
1372  found = true;
1373  }
1374  if (!found) {
1375  // if the hint did not work, do it the slow way
1376  itFound = std::find_if(
1377  end_paths_.begin(),
1378  end_paths_.end(),
1379  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1380  if (itFound != end_paths_.end())
1381  found = true;
1382  }
1383  if (found) {
1384  descriptions.reserve(itFound->size());
1385  for (size_t i = 0; i < itFound->size(); ++i) {
1386  descriptions.push_back(itFound->getWorker(i)->description());
1387  }
1388  }
1389  }
std::string const & name() const
Definition: Path.h:66

◆ moduleDescriptionsInPath()

void edm::StreamSchedule::moduleDescriptionsInPath ( std::string const &  iPathLabel,
std::vector< ModuleDescription const *> &  descriptions,
unsigned int  hint 
) const

Definition at line 1333 of file StreamSchedule.cc.

References newFWLiteAna::found, mps_fire::i, edm::Path::name(), and trig_paths_.

1335  {
1336  descriptions.clear();
1337  bool found = false;
1338  TrigPaths::const_iterator itFound;
1339 
1340  if (hint < trig_paths_.size()) {
1341  itFound = trig_paths_.begin() + hint;
1342  if (itFound->name() == iPathLabel)
1343  found = true;
1344  }
1345  if (!found) {
1346  // if the hint did not work, do it the slow way
1347  itFound = std::find_if(
1348  trig_paths_.begin(),
1349  trig_paths_.end(),
1350  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1351  if (itFound != trig_paths_.end())
1352  found = true;
1353  }
1354  if (found) {
1355  descriptions.reserve(itFound->size());
1356  for (size_t i = 0; i < itFound->size(); ++i) {
1357  descriptions.push_back(itFound->getWorker(i)->description());
1358  }
1359  }
1360  }
std::string const & name() const
Definition: Path.h:66

◆ modulesInPath()

void edm::StreamSchedule::modulesInPath ( std::string const &  iPathLabel,
std::vector< std::string > &  oLabelsToFill 
) const

adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel

Definition at line 1320 of file StreamSchedule.cc.

References mps_fire::i, edm::Path::name(), and trig_paths_.

1320  {
1321  TrigPaths::const_iterator itFound = std::find_if(
1322  trig_paths_.begin(),
1323  trig_paths_.end(),
1324  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
1325  if (itFound != trig_paths_.end()) {
1326  oLabelsToFill.reserve(itFound->size());
1327  for (size_t i = 0; i < itFound->size(); ++i) {
1328  oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
1329  }
1330  }
1331  }
std::string const & name() const
Definition: Path.h:66

◆ numberOfUnscheduledModules()

unsigned int edm::StreamSchedule::numberOfUnscheduledModules ( ) const
inline

Definition at line 232 of file StreamSchedule.h.

References number_of_unscheduled_modules_.

unsigned int number_of_unscheduled_modules_

◆ postScheduleSignal()

template<typename T >
void edm::StreamSchedule::postScheduleSignal ( StreamContext const *  streamContext,
std::exception_ptr &  excpt 
) const
privatenoexcept

Definition at line 437 of file StreamSchedule.h.

References cms::Exception::addContext(), edm::exceptionContext(), and edm::convertException::wrap().

438  {
439  try {
440  convertException::wrap([this, streamContext]() { T::postScheduleSignal(actReg_.get(), streamContext); });
441  } catch (cms::Exception& ex) {
442  if (not excpt) {
443  std::ostringstream ost;
444  ex.addContext("Handling post signal, likely in a service function");
445  exceptionContext(ost, *streamContext);
446  ex.addContext(ost.str());
447  excpt = std::current_exception();
448  }
449  }
450  }
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
std::shared_ptr< ActivityRegistry > actReg_
void addContext(std::string const &context)
Definition: Exception.cc:169
auto wrap(F iFunc) -> decltype(iFunc())

◆ preScheduleSignal()

template<typename T >
void edm::StreamSchedule::preScheduleSignal ( StreamContext const *  streamContext) const
private

Definition at line 424 of file StreamSchedule.h.

References actReg_, cms::Exception::addContext(), edm::exceptionContext(), and edm::convertException::wrap().

424  {
425  try {
426  convertException::wrap([this, streamContext]() { T::preScheduleSignal(actReg_.get(), streamContext); });
427  } catch (cms::Exception& ex) {
428  std::ostringstream ost;
429  ex.addContext("Handling pre signal, likely in a service function");
430  exceptionContext(ost, *streamContext);
431  ex.addContext(ost.str());
432  throw;
433  }
434  }
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
std::shared_ptr< ActivityRegistry > actReg_
void addContext(std::string const &context)
Definition: Exception.cc:169
auto wrap(F iFunc) -> decltype(iFunc())

◆ processOneEventAsync()

void edm::StreamSchedule::processOneEventAsync ( WaitingTaskHolder  iTask,
EventTransitionInfo info,
ServiceToken const &  token,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters 
)

Definition at line 1113 of file StreamSchedule.cc.

References actReg_, cms::cuda::assert(), CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, ecalHexDisplay_cfg::ep, finishedPaths(), finishProcessOneEvent(), edm::WaitingTaskHolder::group(), info(), ALPAKA_ACCELERATOR_NAMESPACE::vertexFinder::it, edm::ServiceWeakToken::lock(), edm::make_waiting_task(), eostools::move(), edm::hlt::Pass, pathStatusInserterWorkers_, edm::WorkerManager::processAccumulatorsAsync(), resetAll(), results_, edm::WorkerManager::setupOnDemandSystem(), edm::WorkerManager::setupResolvers(), streamContext_, streamID_, total_events_, trig_paths_, and workerManagerLumisAndEvents_.

1117  {
1118  EventPrincipal& ep = info.principal();
1119 
1120  // Caught exception is propagated via WaitingTaskHolder
1121  CMS_SA_ALLOW try {
1122  this->resetAll();
1123 
1124  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
1125 
1126  Traits::setStreamContext(streamContext_, ep);
1127  //a service may want to communicate with another service
1128  ServiceRegistry::Operate guard(serviceToken);
1129  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
1130 
1131  // Data dependencies need to be set up before marking empty
1132  // (End)Paths complete in case something consumes the status of
1133  // the empty (EndPath)
1136 
1137  HLTPathStatus hltPathStatus(hlt::Pass, 0);
1138  for (int empty_trig_path : empty_trig_paths_) {
1139  results_->at(empty_trig_path) = hltPathStatus;
1140  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
1141  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
1142  ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1143  info, streamID_, ParentContext(&streamContext_), &streamContext_);
1144  if (except) {
1145  iTask.doneWaiting(except);
1146  return;
1147  }
1148  }
1149  if (not endPathStatusInserterWorkers_.empty()) {
1150  for (int empty_end_path : empty_end_paths_) {
1151  std::exception_ptr except =
1152  endPathStatusInserterWorkers_[empty_end_path]
1153  ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1154  info, streamID_, ParentContext(&streamContext_), &streamContext_);
1155  if (except) {
1156  iTask.doneWaiting(except);
1157  return;
1158  }
1159  }
1160  }
1161 
1162  ++total_events_;
1163 
1164  //use to give priorities on an error to ones from Paths
1165  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
1166  auto pathErrorPtr = pathErrorHolder.get();
1167  ServiceWeakToken weakToken = serviceToken;
1168  auto allPathsDone = make_waiting_task(
1169  [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
1170  ServiceRegistry::Operate operate(weakToken.lock());
1171 
1172  std::exception_ptr ptr;
1173  if (pathError->load()) {
1174  ptr = *pathError->load();
1175  delete pathError->load();
1176  }
1177  if ((not ptr) and iPtr) {
1178  ptr = *iPtr;
1179  }
1180  iTask.doneWaiting(finishProcessOneEvent(ptr));
1181  });
1182  //The holder guarantees that if the paths finish before the loop ends
1183  // that we do not start too soon. It also guarantees that the task will
1184  // run under that condition.
1185  WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
1186 
1187  auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
1188  std::exception_ptr const* iPtr) mutable {
1189  ServiceRegistry::Operate operate(weakToken.lock());
1190 
1191  if (iPtr) {
1192  // free previous value of pathErrorPtr, if any;
1193  // prioritize this error over one that happens in EndPath or Accumulate
1194  auto currentPtr = pathErrorPtr->exchange(new std::exception_ptr(*iPtr));
1195  assert(currentPtr == nullptr);
1196  }
1197  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
1198  });
1199 
1200  //The holder guarantees that if the paths finish before the loop ends
1201  // that we do not start too soon. It also guarantees that the task will
1202  // run under that condition.
1203  WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
1204 
1205  //start end paths first so on single threaded the paths will run first
1206  WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
1207  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
1208  it->processEventUsingPathAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
1209  }
1210 
1211  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
1212  it->processEventUsingPathAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
1213  }
1214 
1215  ParentContext parentContext(&streamContext_);
1216  workerManagerLumisAndEvents_.processAccumulatorsAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
1217  hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
1218  } catch (...) {
1219  iTask.doneWaiting(std::current_exception());
1220  }
1221  }
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::vector< int > empty_trig_paths_
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void processAccumulatorsAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
void setupOnDemandSystem(EventTransitionInfo const &)
assert(be >=bs)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< int > empty_end_paths_
accept
Definition: HLTenums.h:18
StreamContext streamContext_
void finishedPaths(std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
edm::propagate_const< TrigResPtr > results_
void setupResolvers(Principal &principal)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
WorkerManager workerManagerLumisAndEvents_
def move(src, dest)
Definition: eostools.py:511

◆ processOneStreamAsync()

template<typename T >
void edm::StreamSchedule::processOneStreamAsync ( WaitingTaskHolder  iTask,
typename T::TransitionInfoType &  transitionInfo,
ServiceToken const &  token,
bool  cleaningUpAfterException = false 
)

Definition at line 359 of file StreamSchedule.h.

References CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), edm::WaitingTaskHolder::group(), watchdog::group, h, handleException(), info(), edm::InLumi, edm::ServiceWeakToken::lock(), edm::make_functor_task(), edm::make_waiting_task(), eostools::move(), findAndChange::op, edm::WorkerManager::processOneOccurrenceAsync(), edm::WorkerManager::resetAll(), alignCSCRings::s, streamContext_, streamID_, TrackValidation_cff::task, unpackBuffers-CaloStage2::token, edm::StreamID::value(), workerManagerLumisAndEvents_, and workerManagerRuns_.

362  {
363  auto group = iHolder.group();
364  auto const& principal = transitionInfo.principal();
365  T::setStreamContext(streamContext_, principal);
366 
367  ServiceWeakToken weakToken = token;
368  auto doneTask = make_waiting_task([this, iHolder = std::move(iHolder), cleaningUpAfterException, weakToken](
369  std::exception_ptr const* iPtr) mutable {
370  std::exception_ptr excpt;
371  {
372  ServiceRegistry::Operate op(weakToken.lock());
373 
374  if (iPtr) {
375  excpt = *iPtr;
376  handleException(streamContext_, cleaningUpAfterException, excpt);
377  }
378  postScheduleSignal<T>(&streamContext_, excpt);
379  } // release service token before calling doneWaiting
380  iHolder.doneWaiting(excpt);
381  });
382 
383  auto task =
384  make_functor_task([this, h = WaitingTaskHolder(*group, doneTask), info = transitionInfo, weakToken]() mutable {
385  auto token = weakToken.lock();
387  // Caught exception is propagated via WaitingTaskHolder
388  WorkerManager* workerManager = &workerManagerRuns_;
389  if (T::branchType_ == InLumi) {
390  workerManager = &workerManagerLumisAndEvents_;
391  }
392  CMS_SA_ALLOW try {
393  preScheduleSignal<T>(&streamContext_);
394  workerManager->resetAll();
395  } catch (...) {
396  // Just remember the exception at this point,
397  // let the destructor of h call doneWaiting() so the
398  // ServiceRegistry::Operator object is destroyed first
399  h.presetTaskAsFailed(std::current_exception());
400  return;
401  }
402 
403  workerManager->processOneOccurrenceAsync<T>(h, info, token, streamID_, &streamContext_, &streamContext_);
404  });
405 
406  if (streamID_.value() == 0) {
407  //Enqueueing will start another thread if there is only
408  // one thread in the job. Having stream == 0 use spawn
409  // avoids starting up another thread when there is only one stream.
410  group->run([task]() {
411  TaskSentry s{task};
412  task->execute();
413  });
414  } else {
415  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
416  arena.enqueue([task]() {
417  TaskSentry s{task};
418  task->execute();
419  });
420  }
421  }
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
WorkerManager workerManagerRuns_
StreamContext streamContext_
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
void handleException(StreamContext const &, bool cleaningUpAfterException, std::exception_ptr &) const noexcept
unsigned int value() const
Definition: StreamID.h:43
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
long double T
WorkerManager workerManagerLumisAndEvents_
def move(src, dest)
Definition: eostools.py:511

◆ replaceModule()

void edm::StreamSchedule::replaceModule ( maker::ModuleHolder iMod,
std::string const &  iLabel 
)

clone the type of module with label iLabel but configure with iPSet.

Definition at line 1059 of file StreamSchedule.cc.

References cms::Exception::addContext(), allWorkersBeginEnd(), allWorkersLumisAndEvents(), allWorkersRuns(), edm::LuminosityBlockIndex::invalidLuminosityBlockIndex(), edm::RunIndex::invalidRunIndex(), edm::StreamContext::kBeginStream, edm::StreamContext::kInvalid, edm::maker::ModuleHolder::replaceModuleFor(), edm::StreamContext::setEventID(), edm::StreamContext::setLuminosityBlockIndex(), edm::StreamContext::setRunIndex(), edm::StreamContext::setTimestamp(), edm::StreamContext::setTransition(), streamContext_, and streamID_.

1059  {
1060  for (auto const& worker : allWorkersBeginEnd()) {
1061  if (worker->description()->moduleLabel() == iLabel) {
1062  iMod->replaceModuleFor(worker);
1063 
1065  streamContext_.setEventID(EventID(0, 0, 0));
1069  try {
1070  worker->beginStream(streamID_, streamContext_);
1071  } catch (cms::Exception& ex) {
1073  ex.addContext("Executing StreamSchedule::replaceModule");
1074  throw;
1075  }
1077  break;
1078  }
1079  }
1080 
1081  for (auto const& worker : allWorkersRuns()) {
1082  if (worker->description()->moduleLabel() == iLabel) {
1083  iMod->replaceModuleFor(worker);
1084  break;
1085  }
1086  }
1087 
1088  for (auto const& worker : allWorkersLumisAndEvents()) {
1089  if (worker->description()->moduleLabel() == iLabel) {
1090  iMod->replaceModuleFor(worker);
1091  break;
1092  }
1093  }
1094  }
AllWorkers const & allWorkersBeginEnd() const
returns the collection of pointers to workers
void setTimestamp(Timestamp const &v)
Definition: StreamContext.h:70
void setTransition(Transition v)
Definition: StreamContext.h:66
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
void setLuminosityBlockIndex(LuminosityBlockIndex const &v)
Definition: StreamContext.h:69
StreamContext streamContext_
static LuminosityBlockIndex invalidLuminosityBlockIndex()
AllWorkers const & allWorkersLumisAndEvents() const
void addContext(std::string const &context)
Definition: Exception.cc:169
void setEventID(EventID const &v)
Definition: StreamContext.h:67
void setRunIndex(RunIndex const &v)
Definition: StreamContext.h:68
AllWorkers const & allWorkersRuns() const

◆ reportSkipped()

void edm::StreamSchedule::reportSkipped ( EventPrincipal const &  ep) const
inlineprivate

Definition at line 353 of file StreamSchedule.h.

References ecalHexDisplay_cfg::ep.

353  {
354  Service<JobReport> reportSvc;
355  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
356  }

◆ resetAll()

void edm::StreamSchedule::resetAll ( )
private

Definition at line 1452 of file StreamSchedule.cc.

References results_.

Referenced by processOneEventAsync().

1452 { results_->reset(); }
edm::propagate_const< TrigResPtr > results_

◆ resetEarlyDelete()

void edm::StreamSchedule::resetEarlyDelete ( )
private

Definition at line 1456 of file StreamSchedule.cc.

References submitPVResolutionJobs::count, earlyDeleteBranchToCount_, earlyDeleteHelpers_, and earlyDeleteHelperToBranchIndicies_.

Referenced by finishProcessOneEvent(), and initializeEarlyDelete().

1456  {
1457  //must be sure we have cleared the count first
1458  for (auto& count : earlyDeleteBranchToCount_) {
1459  count.count = 0;
1460  }
1461  //now reset based on how many helpers use that branch
1463  ++(earlyDeleteBranchToCount_[index].count);
1464  }
1465  for (auto& helper : earlyDeleteHelpers_) {
1466  helper.reset();
1467  }
1468  }
Definition: helper.py:1
std::vector< BranchToCount > earlyDeleteBranchToCount_
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_

◆ results() [1/2]

TrigResConstPtr edm::StreamSchedule::results ( ) const
inlineprivate

Definition at line 298 of file StreamSchedule.h.

References edm::get_underlying_safe(), and results_.

Referenced by StreamSchedule().

298 { return get_underlying_safe(results_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_

◆ results() [2/2]

TrigResPtr& edm::StreamSchedule::results ( )
inlineprivate

Definition at line 299 of file StreamSchedule.h.

References edm::get_underlying_safe(), and results_.

299 { return get_underlying_safe(results_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_

◆ streamID()

StreamID edm::StreamSchedule::streamID ( ) const
inline

Definition at line 168 of file StreamSchedule.h.

References streamID_.

Referenced by StreamSchedule().

168 { return streamID_; }

◆ totalEvents()

int edm::StreamSchedule::totalEvents ( ) const
inline

Return the number of events this StreamSchedule has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 195 of file StreamSchedule.h.

References total_events_.

Referenced by getTriggerReport(), and totalEventsFailed().

195 { return total_events_; }

◆ totalEventsFailed()

int edm::StreamSchedule::totalEventsFailed ( ) const
inline

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 203 of file StreamSchedule.h.

References totalEvents(), and totalEventsPassed().

Referenced by getTriggerReport().

203 { return totalEvents() - totalEventsPassed(); }
int totalEventsPassed() const
int totalEvents() const

◆ totalEventsPassed()

int edm::StreamSchedule::totalEventsPassed ( ) const
inline

Return the number of events which have been passed by one or more trigger paths.

Definition at line 199 of file StreamSchedule.h.

References total_passed_.

Referenced by getTriggerReport(), and totalEventsFailed().

199 { return total_passed_; }

◆ tryToPlaceConditionalModules()

std::vector< Worker * > edm::StreamSchedule::tryToPlaceConditionalModules ( Worker worker,
std::unordered_set< std::string > &  conditionalModules,
std::unordered_multimap< std::string, edm::BranchDescription const *> const &  conditionalModuleBranches,
std::unordered_multimap< std::string, AliasInfo > const &  aliasMap,
ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration 
)
private

Definition at line 751 of file StreamSchedule.cc.

References cms::cuda::assert(), edm::TypeID::className(), edm::Worker::consumesInfo(), edm::Worker::description(), HerwigMaxPtPartonFilter_cfi::moduleLabel, edm::ModuleDescription::moduleLabel(), or, edm::PRODUCT_TYPE, AlCaHLTBitMon_QueryRunRegistry::string, edm::productholderindexhelper::typeIsViewCompatible(), and workerManagerLumisAndEvents_.

Referenced by fillWorkers().

759  {
760  std::vector<Worker*> returnValue;
761  auto const& consumesInfo = worker->consumesInfo();
762  auto moduleLabel = worker->description()->moduleLabel();
763  using namespace productholderindexhelper;
764  for (auto const& ci : consumesInfo) {
765  if (not ci.skipCurrentProcess() and
766  (ci.process().empty() or ci.process() == processConfiguration->processName())) {
767  auto productModuleLabel = std::string(ci.label());
768  bool productFromConditionalModule = false;
769  auto itFound = conditionalModules.find(productModuleLabel);
770  if (itFound == conditionalModules.end()) {
771  //Check to see if this was an alias
772  //note that aliasMap was previously filtered so only the conditional modules remain there
773  auto foundAlias = findBestMatchingAlias(conditionalModuleBranches, aliasMap, productModuleLabel, ci);
774  if (foundAlias) {
775  productModuleLabel = *foundAlias;
776  productFromConditionalModule = true;
777  itFound = conditionalModules.find(productModuleLabel);
778  //check that the alias-for conditional module has not been used
779  if (itFound == conditionalModules.end()) {
780  continue;
781  }
782  }
783  } else {
784  //need to check the rest of the data product info
785  auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
786  for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
787  if (itBranch->second->productInstanceName() == ci.instance()) {
788  if (ci.kindOfType() == PRODUCT_TYPE) {
789  if (ci.type() == itBranch->second->unwrappedTypeID()) {
790  productFromConditionalModule = true;
791  break;
792  }
793  } else {
794  //this is a view
796  ci.type(), TypeID(itBranch->second->wrappedType().typeInfo()), itBranch->second->className())) {
797  productFromConditionalModule = true;
798  break;
799  }
800  }
801  }
802  }
803  }
804  if (productFromConditionalModule) {
805  auto condWorker = getWorker(
806  productModuleLabel, proc_pset, workerManagerLumisAndEvents_, preg, prealloc, processConfiguration);
807  assert(condWorker);
808 
809  conditionalModules.erase(itFound);
810 
811  auto dependents = tryToPlaceConditionalModules(condWorker,
812  conditionalModules,
813  conditionalModuleBranches,
814  aliasMap,
815  proc_pset,
816  preg,
817  prealloc,
818  processConfiguration);
819  returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
820  returnValue.push_back(condWorker);
821  }
822  }
823  }
824  return returnValue;
825  }
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
assert(be >=bs)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
bool typeIsViewCompatible(TypeID const &requestedViewType, TypeID const &wrappedtypeID, std::string const &className)
WorkerManager workerManagerLumisAndEvents_

◆ unscheduledWorkersLumisAndEvents()

AllWorkers const& edm::StreamSchedule::unscheduledWorkersLumisAndEvents ( ) const
inline

Definition at line 229 of file StreamSchedule.h.

References edm::WorkerManager::unscheduledWorkers(), and workerManagerLumisAndEvents_.

229  {
231  }
AllWorkers const & unscheduledWorkers() const
Definition: WorkerManager.h:83
WorkerManager workerManagerLumisAndEvents_

Member Data Documentation

◆ actReg_

std::shared_ptr<ActivityRegistry> edm::StreamSchedule::actReg_
private

◆ earlyDeleteBranchToCount_

std::vector<BranchToCount> edm::StreamSchedule::earlyDeleteBranchToCount_
private

Definition at line 333 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ earlyDeleteHelpers_

std::vector<EarlyDeleteHelper> edm::StreamSchedule::earlyDeleteHelpers_
private

Definition at line 343 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ earlyDeleteHelperToBranchIndicies_

std::vector<unsigned int> edm::StreamSchedule::earlyDeleteHelperToBranchIndicies_
private

Definition at line 340 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

◆ empty_end_paths_

std::vector<int> edm::StreamSchedule::empty_end_paths_
private

Definition at line 328 of file StreamSchedule.h.

Referenced by fillEndPath(), makePathStatusInserters(), and processOneEventAsync().

◆ empty_trig_paths_

std::vector<int> edm::StreamSchedule::empty_trig_paths_
private

Definition at line 327 of file StreamSchedule.h.

Referenced by fillTrigPath(), makePathStatusInserters(), and processOneEventAsync().

◆ end_paths_

TrigPaths edm::StreamSchedule::end_paths_
private

◆ endPathStatusInserterWorkers_

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::endPathStatusInserterWorkers_
private

Definition at line 323 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

◆ number_of_unscheduled_modules_

unsigned int edm::StreamSchedule::number_of_unscheduled_modules_
private

Definition at line 347 of file StreamSchedule.h.

Referenced by numberOfUnscheduledModules(), and StreamSchedule().

◆ pathStatusInserterWorkers_

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::pathStatusInserterWorkers_
private

Definition at line 322 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

◆ results_

edm::propagate_const<TrigResPtr> edm::StreamSchedule::results_
private

Definition at line 319 of file StreamSchedule.h.

Referenced by finishedPaths(), processOneEventAsync(), resetAll(), and results().

◆ results_inserter_

edm::propagate_const<WorkerPtr> edm::StreamSchedule::results_inserter_
private

Definition at line 321 of file StreamSchedule.h.

Referenced by finishedPaths(), and StreamSchedule().

◆ streamContext_

StreamContext edm::StreamSchedule::streamContext_
private

◆ streamID_

StreamID edm::StreamSchedule::streamID_
private

◆ total_events_

int edm::StreamSchedule::total_events_
private

Definition at line 345 of file StreamSchedule.h.

Referenced by clearCounters(), processOneEventAsync(), and totalEvents().

◆ total_passed_

int edm::StreamSchedule::total_passed_
private

Definition at line 346 of file StreamSchedule.h.

Referenced by clearCounters(), finishedPaths(), and totalEventsPassed().

◆ trig_paths_

TrigPaths edm::StreamSchedule::trig_paths_
private

◆ workerManagerBeginEnd_

WorkerManager edm::StreamSchedule::workerManagerBeginEnd_
private

◆ workerManagerLumisAndEvents_

WorkerManager edm::StreamSchedule::workerManagerLumisAndEvents_
private

◆ workerManagerRuns_

WorkerManager edm::StreamSchedule::workerManagerRuns_
private