CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
List of all members | Classes | Public Types | Public Member Functions | Private Member Functions | Private Attributes
edm::StreamSchedule Class Reference

#include <StreamSchedule.h>

Classes

class  SendTerminationSignalIfException
 

Public Types

typedef std::vector< Worker * > AllWorkers
 
typedef std::vector< PathNonTrigPaths
 
typedef std::vector< WorkerInPathPathWorkers
 
typedef std::vector< PathTrigPaths
 
typedef std::shared_ptr
< HLTGlobalStatus const > 
TrigResConstPtr
 
typedef std::shared_ptr
< HLTGlobalStatus
TrigResPtr
 
typedef std::vector< std::string > vstring
 
typedef std::shared_ptr< WorkerWorkerPtr
 
typedef std::vector< Worker * > Workers
 

Public Member Functions

AllWorkers const & allWorkers () const
 returns the collection of pointers to workers More...
 
void availablePaths (std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill the labels for all paths in the process More...
 
void beginStream ()
 
void clearCounters ()
 Clear all the counters in the trigger report. More...
 
StreamContext const & context () const
 
void deleteModule (std::string const &iLabel)
 Delete the module with label iLabel. More...
 
void endStream ()
 
std::vector< ModuleDescription
const * > 
getAllModuleDescriptions () const
 
void getTriggerReport (TriggerReport &rep) const
 
void initializeEarlyDelete (ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, edm::ProductRegistry const &preg)
 
void moduleDescriptionsInEndPath (std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
 
void moduleDescriptionsInPath (std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
 
void modulesInPath (std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel More...
 
unsigned int numberOfUnscheduledModules () const
 
void processOneEventAsync (WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
 
template<typename T >
void processOneStreamAsync (WaitingTaskHolder iTask, typename T::TransitionInfoType &transitionInfo, ServiceToken const &token, bool cleaningUpAfterException=false)
 
void replaceModule (maker::ModuleHolder *iMod, std::string const &iLabel)
 clone the type of module with label iLabel but configure with iPSet. More...
 
StreamID streamID () const
 
 StreamSchedule (std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, StreamID streamID, ProcessContext const *processContext)
 
 StreamSchedule (StreamSchedule const &)=delete
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 

Private Member Functions

ExceptionToActionTable const & actionTable () const
 returns the action table More...
 
void addToAllWorkers (Worker *w)
 
void fillEndPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
 
void fillTrigPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
 
void fillWorkers (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
 
void finishedPaths (std::atomic< std::exception_ptr * > &, WaitingTaskHolder, EventTransitionInfo &)
 
std::exception_ptr finishProcessOneEvent (std::exception_ptr)
 
void makePathStatusInserters (std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
 
void reportSkipped (EventPrincipal const &ep) const
 
void resetAll ()
 
void resetEarlyDelete ()
 
TrigResConstPtr results () const
 
TrigResPtrresults ()
 

Private Attributes

std::shared_ptr< ActivityRegistryactReg_
 
std::vector< BranchToCountearlyDeleteBranchToCount_
 
std::vector< EarlyDeleteHelperearlyDeleteHelpers_
 
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
 
std::vector< int > empty_end_paths_
 
std::vector< int > empty_trig_paths_
 
TrigPaths end_paths_
 
std::vector
< edm::propagate_const
< WorkerPtr > > 
endPathStatusInserterWorkers_
 
unsigned int number_of_unscheduled_modules_
 
std::vector
< edm::propagate_const
< WorkerPtr > > 
pathStatusInserterWorkers_
 
edm::propagate_const< TrigResPtrresults_
 
edm::propagate_const< WorkerPtrresults_inserter_
 
std::atomic< bool > skippingEvent_
 
StreamContext streamContext_
 
StreamID streamID_
 
int total_events_
 
int total_passed_
 
TrigPaths trig_paths_
 
WorkerManager workerManager_
 

Detailed Description

Definition at line 152 of file StreamSchedule.h.

Member Typedef Documentation

typedef std::vector<Worker*> edm::StreamSchedule::AllWorkers

Definition at line 160 of file StreamSchedule.h.

Definition at line 156 of file StreamSchedule.h.

Definition at line 164 of file StreamSchedule.h.

typedef std::vector<Path> edm::StreamSchedule::TrigPaths

Definition at line 155 of file StreamSchedule.h.

typedef std::shared_ptr<HLTGlobalStatus const> edm::StreamSchedule::TrigResConstPtr

Definition at line 158 of file StreamSchedule.h.

Definition at line 157 of file StreamSchedule.h.

typedef std::vector<std::string> edm::StreamSchedule::vstring

Definition at line 154 of file StreamSchedule.h.

typedef std::shared_ptr<Worker> edm::StreamSchedule::WorkerPtr

Definition at line 159 of file StreamSchedule.h.

typedef std::vector<Worker*> edm::StreamSchedule::Workers

Definition at line 162 of file StreamSchedule.h.

Constructor & Destructor Documentation

edm::StreamSchedule::StreamSchedule ( std::shared_ptr< TriggerResultInserter inserter,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
std::shared_ptr< ModuleRegistry modReg,
ParameterSet proc_pset,
service::TriggerNamesService const &  tns,
PreallocationConfiguration const &  prealloc,
ProductRegistry pregistry,
BranchIDListHelper branchIDListHelper,
ExceptionToActionTable const &  actions,
std::shared_ptr< ActivityRegistry areg,
std::shared_ptr< ProcessConfiguration processConfiguration,
StreamID  streamID,
ProcessContext const *  processContext 
)

Definition at line 136 of file StreamSchedule.cc.

References actReg_, addToAllWorkers(), edm::WorkerManager::addToUnscheduledWorkers(), allWorkers(), cms::cuda::assert(), end_paths_, fillEndPath(), fillTrigPath(), edm::propagate_const< T >::get(), edm::service::TriggerNamesService::getEndPaths(), edm::ParameterSet::getParameter(), edm::ParameterSet::getPSetForUpdate(), edm::service::TriggerNamesService::getTrigPaths(), label, makePathStatusInserters(), number_of_unscheduled_modules_, results(), results_inserter_, DBoxMetadataHelper::set_difference(), trig_paths_, edm::StreamID::value(), and workerManager_.

151  : workerManager_(modReg, areg, actions),
152  actReg_(areg),
153  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
155  trig_paths_(),
156  end_paths_(),
157  total_events_(),
158  total_passed_(),
161  streamContext_(streamID_, processContext),
162  skippingEvent_(false) {
163  bool hasPath = false;
164  std::vector<std::string> const& pathNames = tns.getTrigPaths();
165  std::vector<std::string> const& endPathNames = tns.getEndPaths();
166 
167  int trig_bitpos = 0;
168  trig_paths_.reserve(pathNames.size());
169  for (auto const& trig_name : pathNames) {
170  fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name, results(), endPathNames);
171  ++trig_bitpos;
172  hasPath = true;
173  }
174 
175  if (hasPath) {
176  // the results inserter stands alone
177  inserter->setTrigResultForStream(streamID.value(), results());
178 
179  results_inserter_ = makeInserter(actions, actReg_, inserter);
181  }
182 
183  // fill normal endpaths
184  int bitpos = 0;
185  end_paths_.reserve(endPathNames.size());
186  for (auto const& end_path_name : endPathNames) {
187  fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames);
188  ++bitpos;
189  }
190 
191  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
192 
193  //See if all modules were used
194  std::set<std::string> usedWorkerLabels;
195  for (auto const& worker : allWorkers()) {
196  usedWorkerLabels.insert(worker->description()->moduleLabel());
197  }
198  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
199  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
200  std::vector<std::string> unusedLabels;
201  set_difference(modulesInConfigSet.begin(),
202  modulesInConfigSet.end(),
203  usedWorkerLabels.begin(),
204  usedWorkerLabels.end(),
205  back_inserter(unusedLabels));
206  std::set<std::string> unscheduledLabels;
207  std::vector<std::string> shouldBeUsedLabels;
208  if (!unusedLabels.empty()) {
209  //Need to
210  // 1) create worker
211  // 2) if it is a WorkerT<EDProducer>, add it to our list
212  // 3) hand list to our delayed reader
213  for (auto const& label : unusedLabels) {
214  bool isTracked;
215  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
216  assert(isTracked);
217  assert(modulePSet != nullptr);
219  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
220  }
221  if (!shouldBeUsedLabels.empty()) {
222  std::ostringstream unusedStream;
223  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
224  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
225  itLabelEnd = shouldBeUsedLabels.end();
226  itLabel != itLabelEnd;
227  ++itLabel) {
228  unusedStream << ",'" << *itLabel << "'";
229  }
230  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
231  }
232  }
233  number_of_unscheduled_modules_ = unscheduledLabels.size();
234  } // StreamSchedule::StreamSchedule
pathNames_ & tns()), endPathNames_(&tns.getEndPaths()), wantSummary_(tns.wantSummary()
Definition: Schedule.cc:691
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
processConfiguration
Definition: Schedule.cc:687
void addToAllWorkers(Worker *w)
edm::propagate_const< WorkerPtr > results_inserter_
assert(be >=bs)
actions
Definition: Schedule.cc:687
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
char const * label
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
StreamID streamID() const
StreamContext streamContext_
areg
Definition: Schedule.cc:687
Log< level::Info, false > LogInfo
unsigned int value() const
Definition: StreamID.h:43
edm::propagate_const< TrigResPtr > results_
constexpr element_type const * get() const
std::atomic< bool > skippingEvent_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
preg
Definition: Schedule.cc:687
TrigResConstPtr results() const
std::vector< std::string > set_difference(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
prealloc
Definition: Schedule.cc:687
edm::StreamSchedule::StreamSchedule ( StreamSchedule const &  )
delete

Member Function Documentation

ExceptionToActionTable const& edm::StreamSchedule::actionTable ( ) const
inlineprivate

returns the action table

Definition at line 281 of file StreamSchedule.h.

References edm::WorkerManager::actionTable(), and workerManager_.

Referenced by fillEndPath(), fillTrigPath(), and finishedPaths().

281 { return workerManager_.actionTable(); }
WorkerManager workerManager_
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:89
void edm::StreamSchedule::addToAllWorkers ( Worker w)
private
AllWorkers const& edm::StreamSchedule::allWorkers ( ) const
inline

returns the collection of pointers to workers

Definition at line 253 of file StreamSchedule.h.

References edm::WorkerManager::allWorkers(), and workerManager_.

Referenced by clearCounters(), getAllModuleDescriptions(), getTriggerReport(), initializeEarlyDelete(), replaceModule(), and StreamSchedule().

253 { return workerManager_.allWorkers(); }
WorkerManager workerManager_
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:85
void edm::StreamSchedule::availablePaths ( std::vector< std::string > &  oLabelsToFill) const

adds to oLabelsToFill the labels for all paths in the process

Definition at line 737 of file StreamSchedule.cc.

References edm::Path::name(), HcalDetIdTransform::transform(), and trig_paths_.

737  {
738  oLabelsToFill.reserve(trig_paths_.size());
739  std::transform(trig_paths_.begin(),
740  trig_paths_.end(),
741  std::back_inserter(oLabelsToFill),
742  std::bind(&Path::name, std::placeholders::_1));
743  }
std::string const & name() const
Definition: Path.h:74
unsigned transform(const HcalDetId &id, unsigned transformCode)
void edm::StreamSchedule::beginStream ( )

Definition at line 511 of file StreamSchedule.cc.

References edm::WorkerManager::beginStream(), streamContext_, streamID_, and workerManager_.

WorkerManager workerManager_
StreamContext streamContext_
void beginStream(StreamID iID, StreamContext &streamContext)
void edm::StreamSchedule::clearCounters ( )

Clear all the counters in the trigger report.

Definition at line 868 of file StreamSchedule.cc.

References allWorkers(), edm::Path::clearCounters(), edm::Worker::clearCounters(), end_paths_, edm::for_all(), total_events_, total_passed_, and trig_paths_.

868  {
869  using std::placeholders::_1;
871  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
872  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
873  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
874  }
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void clearCounters()
Definition: Worker.h:222
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
void clearCounters()
Definition: Path.cc:198
StreamContext const& edm::StreamSchedule::context ( ) const
inline

Definition at line 257 of file StreamSchedule.h.

References streamContext_.

257 { return streamContext_; }
StreamContext streamContext_
void edm::StreamSchedule::deleteModule ( std::string const &  iLabel)

Delete the module with label iLabel.

Definition at line 531 of file StreamSchedule.cc.

References edm::WorkerManager::deleteModuleIfExists(), and workerManager_.

void deleteModuleIfExists(std::string const &moduleLabel)
WorkerManager workerManager_
void edm::StreamSchedule::endStream ( )

Definition at line 513 of file StreamSchedule.cc.

References edm::WorkerManager::endStream(), streamContext_, streamID_, and workerManager_.

void endStream(StreamID iID, StreamContext &streamContext)
WorkerManager workerManager_
StreamContext streamContext_
void edm::StreamSchedule::fillEndPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
std::vector< std::string > const &  endPathNames 
)
private

Definition at line 482 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_end_paths_, end_paths_, fillWorkers(), edm::PathContext::kEndPath, and streamContext_.

Referenced by StreamSchedule().

488  {
489  PathWorkers tmpworkers;
490  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames);
491 
492  if (!tmpworkers.empty()) {
493  //EndPaths are not supposed to stop if SkipEvent type exception happens
494  end_paths_.emplace_back(bitpos,
495  name,
496  tmpworkers,
497  TrigResPtr(),
498  actionTable(),
499  actReg_,
501  nullptr,
503  } else {
504  empty_end_paths_.push_back(bitpos);
505  }
506  for (WorkerInPath const& workerInPath : tmpworkers) {
507  addToAllWorkers(workerInPath.getWorker());
508  }
509  }
std::shared_ptr< HLTGlobalStatus > TrigResPtr
processConfiguration
Definition: Schedule.cc:687
void addToAllWorkers(Worker *w)
ExceptionToActionTable const & actionTable() const
returns the action table
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
StreamContext streamContext_
preg
Definition: Schedule.cc:687
prealloc
Definition: Schedule.cc:687
void edm::StreamSchedule::fillTrigPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
TrigResPtr  trptr,
std::vector< std::string > const &  endPathNames 
)
private

Definition at line 452 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_trig_paths_, fillWorkers(), edm::PathContext::kPath, skippingEvent_, streamContext_, and trig_paths_.

Referenced by StreamSchedule().

459  {
460  PathWorkers tmpworkers;
461  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames);
462 
463  // an empty path will cause an extra bit that is not used
464  if (!tmpworkers.empty()) {
465  trig_paths_.emplace_back(bitpos,
466  name,
467  tmpworkers,
468  trptr,
469  actionTable(),
470  actReg_,
474  } else {
475  empty_trig_paths_.push_back(bitpos);
476  }
477  for (WorkerInPath const& workerInPath : tmpworkers) {
478  addToAllWorkers(workerInPath.getWorker());
479  }
480  }
std::vector< int > empty_trig_paths_
processConfiguration
Definition: Schedule.cc:687
void addToAllWorkers(Worker *w)
ExceptionToActionTable const & actionTable() const
returns the action table
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
StreamContext streamContext_
std::atomic< bool > skippingEvent_
preg
Definition: Schedule.cc:687
prealloc
Definition: Schedule.cc:687
void edm::StreamSchedule::fillWorkers ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
std::string const &  name,
bool  ignoreFilters,
PathWorkers out,
std::vector< std::string > const &  endPathNames 
)
private

Definition at line 381 of file StreamSchedule.cc.

References cms::cuda::assert(), edm::errors::Configuration, edm::Worker::description(), Exception, edm::ParameterSet::getParameter(), edm::ParameterSet::getPSetForUpdate(), edm::ParameterSet::getUntrackedParameter(), edm::WorkerManager::getWorker(), edm::WorkerInPath::Ignore, edm::Worker::kFilter, edm::ModuleDescription::moduleName(), edm::Worker::moduleType(), mergeVDriftHistosByStation::name, edm::WorkerInPath::Normal, or, EgammaValidation_cff::pathName, edm::search_all(), AlCaHLTBitMon_QueryRunRegistry::string, edm::WorkerInPath::Veto, and workerManager_.

Referenced by fillEndPath(), and fillTrigPath().

388  {
389  vstring modnames = proc_pset.getParameter<vstring>(pathName);
390  PathWorkers tmpworkers;
391 
392  unsigned int placeInPath = 0;
393  for (auto const& name : modnames) {
394  //Modules except EDFilters are set to run concurrently by default
395  bool doNotRunConcurrently = false;
397  if (name[0] == '!') {
398  filterAction = WorkerInPath::Veto;
399  } else if (name[0] == '-' or name[0] == '+') {
400  filterAction = WorkerInPath::Ignore;
401  }
402  if (name[0] == '|' or name[0] == '+') {
403  //cms.wait was specified so do not run concurrently
404  doNotRunConcurrently = true;
405  }
406 
407  std::string moduleLabel = name;
408  if (filterAction != WorkerInPath::Normal or name[0] == '|') {
409  moduleLabel.erase(0, 1);
410  }
411 
412  bool isTracked;
413  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
414  if (modpset == nullptr) {
415  std::string pathType("endpath");
416  if (!search_all(endPathNames, pathName)) {
417  pathType = std::string("path");
418  }
420  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
421  << "\"\n please check spelling or remove that label from the path.";
422  }
423  assert(isTracked);
424 
425  Worker* worker = workerManager_.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
426  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
427  // We have a filter on an end path, and the filter is not explicitly ignored.
428  // See if the filter is allowed.
429  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
430  if (!search_all(allowed_filters, worker->description()->moduleName())) {
431  // Filter is not allowed. Ignore the result, and issue a warning.
432  filterAction = WorkerInPath::Ignore;
433  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
434  << "' with module label '" << moduleLabel << "' appears on EndPath '"
435  << pathName << "'.\n"
436  << "The return value of the filter will be ignored.\n"
437  << "To suppress this warning, either remove the filter from the endpath,\n"
438  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
439  }
440  }
441  bool runConcurrently = not doNotRunConcurrently;
442  if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
443  runConcurrently = false;
444  }
445  tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
446  ++placeInPath;
447  }
448 
449  out.swap(tmpworkers);
450  }
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
vector< string > vstring
Definition: ExoticaDQM.cc:8
processConfiguration
Definition: Schedule.cc:687
assert(be >=bs)
WorkerManager workerManager_
std::vector< WorkerInPath > PathWorkers
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
Log< level::Warning, false > LogWarning
preg
Definition: Schedule.cc:687
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
prealloc
Definition: Schedule.cc:687
void edm::StreamSchedule::finishedPaths ( std::atomic< std::exception_ptr * > &  iExcept,
WaitingTaskHolder  iWait,
EventTransitionInfo info 
)
private

Definition at line 647 of file StreamSchedule.cc.

References mps_fire::action, actionTable(), cms::Exception::addContext(), cms::cuda::assert(), cms::Exception::category(), CMS_SA_ALLOW, cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), alignCSCRings::e, edm::exception_actions::FailPath, edm::ExceptionToActionTable::find(), edm::propagate_const< T >::get(), edm::EventPrincipal::id(), edm::exception_actions::IgnoreCompletely, info(), edm::EventTransitionInfo::principal(), edm::printCmsExceptionWarning(), results_, results_inserter_, edm::exception_actions::SkipEvent, streamContext_, streamID_, and total_passed_.

Referenced by processOneEventAsync().

649  {
650  if (iExcept) {
651  // Caught exception is propagated via WaitingTaskHolder
652  CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
656  if (action == exception_actions::SkipEvent) {
657  edm::printCmsExceptionWarning("SkipEvent", e);
658  *(iExcept.load()) = std::exception_ptr();
659  } else {
660  *(iExcept.load()) = std::current_exception();
661  }
662  } catch (...) {
663  *(iExcept.load()) = std::current_exception();
664  }
665  }
666 
667  if ((not iExcept) and results_->accept()) {
668  ++total_passed_;
669  }
670 
671  if (nullptr != results_inserter_.get()) {
672  // Caught exception is propagated to the caller
673  CMS_SA_ALLOW try {
674  //Even if there was an exception, we need to allow results inserter
675  // to run since some module may be waiting on its results.
676  ParentContext parentContext(&streamContext_);
677  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
678 
679  auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
680  if (expt) {
681  std::rethrow_exception(expt);
682  }
683  } catch (cms::Exception& ex) {
684  if (not iExcept) {
685  if (ex.context().empty()) {
686  std::ostringstream ost;
687  ost << "Processing Event " << info.principal().id();
688  ex.addContext(ost.str());
689  }
690  iExcept.store(new std::exception_ptr(std::current_exception()));
691  }
692  } catch (...) {
693  if (not iExcept) {
694  iExcept.store(new std::exception_ptr(std::current_exception()));
695  }
696  }
697  }
698  std::exception_ptr ptr;
699  if (iExcept) {
700  ptr = *iExcept.load();
701  }
702  iWait.doneWaiting(ptr);
703  }
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::string const & category() const
Definition: Exception.cc:143
exception_actions::ActionCodes find(const std::string &category) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
assert(be >=bs)
StreamContext streamContext_
std::list< std::string > const & context() const
Definition: Exception.cc:147
edm::propagate_const< TrigResPtr > results_
void addContext(std::string const &context)
Definition: Exception.cc:165
string action
Definition: mps_fire.py:183
constexpr element_type const * get() const
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
std::exception_ptr edm::StreamSchedule::finishProcessOneEvent ( std::exception_ptr  iExcept)
private

Definition at line 705 of file StreamSchedule.cc.

References actReg_, edm::addContextAndPrintException(), CMS_SA_ALLOW, cms::Exception::context(), edm::ExceptionFromThisContext, resetEarlyDelete(), streamContext_, and edm::convertException::wrap().

Referenced by processOneEventAsync().

705  {
706  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
707 
708  if (iExcept) {
709  //add context information to the exception and print message
710  try {
711  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
712  } catch (cms::Exception& ex) {
713  bool const cleaningUpAfterException = false;
714  if (ex.context().empty()) {
715  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
716  } else {
717  addContextAndPrintException("", ex, cleaningUpAfterException);
718  }
719  iExcept = std::current_exception();
720  }
721 
722  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
723  }
724  // Caught exception is propagated to the caller
725  CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
726  if (not iExcept) {
727  iExcept = std::current_exception();
728  }
729  }
730  if (not iExcept) {
732  }
733 
734  return iExcept;
735  }
#define CMS_SA_ALLOW
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::shared_ptr< ActivityRegistry > actReg_
StreamContext streamContext_
std::list< std::string > const & context() const
Definition: Exception.cc:147
auto wrap(F iFunc) -> decltype(iFunc())
std::vector< ModuleDescription const * > edm::StreamSchedule::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this StreamSchedule. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 533 of file StreamSchedule.cc.

References allWorkers(), AlCaHLTBitMon_ParallelJobs::p, mps_fire::result, and findQualityFiles::size.

533  {
534  std::vector<ModuleDescription const*> result;
535  result.reserve(allWorkers().size());
536 
537  for (auto const& worker : allWorkers()) {
538  ModuleDescription const* p = worker->description();
539  result.push_back(p);
540  }
541  return result;
542  }
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
tuple result
Definition: mps_fire.py:311
tuple size
Write out results.
void edm::StreamSchedule::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 858 of file StreamSchedule.cc.

References allWorkers(), end_paths_, edm::TriggerReport::endPathSummaries, edm::TriggerReport::eventSummary, edm::fillPathSummary(), edm::fillWorkerSummary(), edm::EventSummary::totalEvents, totalEvents(), edm::EventSummary::totalEventsFailed, totalEventsFailed(), edm::EventSummary::totalEventsPassed, totalEventsPassed(), trig_paths_, edm::TriggerReport::trigPathSummaries, and edm::TriggerReport::workerSummaries.

858  {
859  rep.eventSummary.totalEvents += totalEvents();
860  rep.eventSummary.totalEventsPassed += totalEventsPassed();
861  rep.eventSummary.totalEventsFailed += totalEventsFailed();
862 
863  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
864  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
865  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
866  }
string rep
Definition: cuy.py:1189
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
int totalEventsFailed() const
int totalEvents() const
int totalEventsPassed() const
static void fillPathSummary(Path const &path, PathSummary &sum)
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
void edm::StreamSchedule::initializeEarlyDelete ( ModuleRegistry modReg,
std::vector< std::string > const &  branchesToDeleteEarly,
edm::ProductRegistry const &  preg 
)

Definition at line 236 of file StreamSchedule.cc.

References allWorkers(), edm::BranchDescription::branchName(), edm::maker::ModuleHolder::createOutputModuleCommunicator(), CommonMethods::delta(), submitPVResolutionJobs::desc, earlyDeleteBranchToCount_, earlyDeleteHelpers_, earlyDeleteHelperToBranchIndicies_, end_paths_, edm::ModuleRegistry::forAllModuleHolders(), newFWLiteAna::found, edm::pset::Registry::getMapped(), edm::InEvent, edm::pset::Registry::instance(), B2GTnPMonitor_cfi::item, cmsLHEtoEOSManager::l, dqmiodumpmetadata::n, AlCaHLTBitMon_ParallelJobs::p, TrackValidation_cff::pset, resetEarlyDelete(), AlCaHLTBitMon_QueryRunRegistry::string, trig_paths_, and w().

238  {
239  // setup the list with those products actually registered for this job
240  std::multimap<std::string, Worker*> branchToReadingWorker;
241  initializeBranchToReadingWorker(branchesToDeleteEarly, preg, branchToReadingWorker);
242 
243  const std::vector<std::string> kEmpty;
244  std::map<Worker*, unsigned int> reserveSizeForWorker;
245  unsigned int upperLimitOnReadingWorker = 0;
246  unsigned int upperLimitOnIndicies = 0;
247  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
248 
249  //talk with output modules first
250  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
251  auto comm = iHolder->createOutputModuleCommunicator();
252  if (comm) {
253  if (!branchToReadingWorker.empty()) {
254  //If an OutputModule needs a product, we can't delete it early
255  // so we should remove it from our list
256  SelectedProductsForBranchType const& kept = comm->keptProducts();
257  for (auto const& item : kept[InEvent]) {
258  BranchDescription const& desc = *item.first;
259  auto found = branchToReadingWorker.equal_range(desc.branchName());
260  if (found.first != found.second) {
261  --nUniqueBranchesToDelete;
262  branchToReadingWorker.erase(found.first, found.second);
263  }
264  }
265  }
266  }
267  });
268 
269  if (branchToReadingWorker.empty()) {
270  return;
271  }
272 
273  for (auto w : allWorkers()) {
274  //determine if this module could read a branch we want to delete early
275  auto pset = pset::Registry::instance()->getMapped(w->description()->parameterSetID());
276  if (nullptr != pset) {
277  auto branches = pset->getUntrackedParameter<std::vector<std::string>>("mightGet", kEmpty);
278  if (not branches.empty()) {
279  ++upperLimitOnReadingWorker;
280  }
281  for (auto const& branch : branches) {
282  auto found = branchToReadingWorker.equal_range(branch);
283  if (found.first != found.second) {
284  ++upperLimitOnIndicies;
285  ++reserveSizeForWorker[w];
286  if (nullptr == found.first->second) {
287  found.first->second = w;
288  } else {
289  branchToReadingWorker.insert(make_pair(found.first->first, w));
290  }
291  }
292  }
293  }
294  }
295  {
296  auto it = branchToReadingWorker.begin();
297  std::vector<std::string> unusedBranches;
298  while (it != branchToReadingWorker.end()) {
299  if (it->second == nullptr) {
300  unusedBranches.push_back(it->first);
301  //erasing the object invalidates the iterator so must advance it first
302  auto temp = it;
303  ++it;
304  branchToReadingWorker.erase(temp);
305  } else {
306  ++it;
307  }
308  }
309  if (not unusedBranches.empty()) {
310  LogWarning l("UnusedProductsForCanDeleteEarly");
311  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
312  " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
313  for (auto const& n : unusedBranches) {
314  l << "\n " << n;
315  }
316  }
317  }
318  if (!branchToReadingWorker.empty()) {
319  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
320  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
321  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
322  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
323  std::string lastBranchName;
324  size_t nextOpenIndex = 0;
325  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
326  for (auto& branchAndWorker : branchToReadingWorker) {
327  if (lastBranchName != branchAndWorker.first) {
328  //have to put back the period we removed earlier in order to get the proper name
329  BranchID bid(branchAndWorker.first + ".");
330  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
331  lastBranchName = branchAndWorker.first;
332  }
333  auto found = alreadySeenWorkers.find(branchAndWorker.second);
334  if (alreadySeenWorkers.end() == found) {
335  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
336  // all the branches that might be read by this worker. However, initially we will only tell the
337  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
338  // EarlyDeleteHelper will automatically advance its internal end pointer.
339  size_t index = nextOpenIndex;
340  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
342  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
343  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
344  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
345  nextOpenIndex += nIndices;
346  } else {
347  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
348  }
349  }
350 
351  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
352  // space needed for each module
353  auto itLast = earlyDeleteHelpers_.begin();
354  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
355  if (itLast->end() != it->begin()) {
356  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
357  unsigned int delta = it->begin() - itLast->end();
358  it->shiftIndexPointers(delta);
359 
361  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
362  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
363  }
364  itLast = it;
365  }
367  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
369 
370  //now tell the paths about the deleters
371  for (auto& p : trig_paths_) {
372  p.setEarlyDeleteHelpers(alreadySeenWorkers);
373  }
374  for (auto& p : end_paths_) {
375  p.setEarlyDeleteHelpers(alreadySeenWorkers);
376  }
378  }
379  }
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
std::vector< BranchToCount > earlyDeleteBranchToCount_
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:17
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
T w() const
Log< level::Warning, false > LogWarning
preg
Definition: Schedule.cc:687
static Registry * instance()
Definition: Registry.cc:12
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void edm::StreamSchedule::makePathStatusInserters ( std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
ExceptionToActionTable const &  actions 
)
private

Definition at line 897 of file StreamSchedule.cc.

References edm::actions, actReg_, addToAllWorkers(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, edm::get_underlying(), pathStatusInserterWorkers_, and trig_paths_.

Referenced by StreamSchedule().

900  {
901  int bitpos = 0;
902  unsigned int indexEmpty = 0;
903  unsigned int indexOfPath = 0;
904  for (auto& pathStatusInserter : pathStatusInserters) {
905  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
906  WorkerPtr workerPtr(
907  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
908  pathStatusInserterWorkers_.emplace_back(workerPtr);
909  workerPtr->setActivityRegistry(actReg_);
910  addToAllWorkers(workerPtr.get());
911 
912  // A little complexity here because a C++ Path object is not
913  // instantiated and put into end_paths if there are no modules
914  // on the configured path.
915  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
916  ++indexEmpty;
917  } else {
918  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
919  ++indexOfPath;
920  }
921  ++bitpos;
922  }
923 
924  bitpos = 0;
925  indexEmpty = 0;
926  indexOfPath = 0;
927  for (auto& endPathStatusInserter : endPathStatusInserters) {
928  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
929  WorkerPtr workerPtr(
930  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
931  endPathStatusInserterWorkers_.emplace_back(workerPtr);
932  workerPtr->setActivityRegistry(actReg_);
933  addToAllWorkers(workerPtr.get());
934 
935  // A little complexity here because a C++ Path object is not
936  // instantiated and put into end_paths if there are no modules
937  // on the configured path.
938  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
939  ++indexEmpty;
940  } else {
941  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
942  ++indexOfPath;
943  }
944  ++bitpos;
945  }
946  }
std::vector< int > empty_trig_paths_
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void addToAllWorkers(Worker *w)
std::shared_ptr< Worker > WorkerPtr
actions
Definition: Schedule.cc:687
std::shared_ptr< ActivityRegistry > actReg_
std::vector< int > empty_end_paths_
constexpr T & get_underlying(propagate_const< T > &)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void edm::StreamSchedule::moduleDescriptionsInEndPath ( std::string const &  iEndPathLabel,
std::vector< ModuleDescription const * > &  descriptions,
unsigned int  hint 
) const

Definition at line 787 of file StreamSchedule.cc.

References end_paths_, newFWLiteAna::found, mps_fire::i, and edm::Path::name().

789  {
790  descriptions.clear();
791  bool found = false;
792  TrigPaths::const_iterator itFound;
793 
794  if (hint < end_paths_.size()) {
795  itFound = end_paths_.begin() + hint;
796  if (itFound->name() == iEndPathLabel)
797  found = true;
798  }
799  if (!found) {
800  // if the hint did not work, do it the slow way
801  itFound = std::find_if(
802  end_paths_.begin(),
803  end_paths_.end(),
804  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
805  if (itFound != end_paths_.end())
806  found = true;
807  }
808  if (found) {
809  descriptions.reserve(itFound->size());
810  for (size_t i = 0; i < itFound->size(); ++i) {
811  descriptions.push_back(itFound->getWorker(i)->description());
812  }
813  }
814  }
std::string const & name() const
Definition: Path.h:74
void edm::StreamSchedule::moduleDescriptionsInPath ( std::string const &  iPathLabel,
std::vector< ModuleDescription const * > &  descriptions,
unsigned int  hint 
) const

Definition at line 758 of file StreamSchedule.cc.

References newFWLiteAna::found, mps_fire::i, edm::Path::name(), and trig_paths_.

760  {
761  descriptions.clear();
762  bool found = false;
763  TrigPaths::const_iterator itFound;
764 
765  if (hint < trig_paths_.size()) {
766  itFound = trig_paths_.begin() + hint;
767  if (itFound->name() == iPathLabel)
768  found = true;
769  }
770  if (!found) {
771  // if the hint did not work, do it the slow way
772  itFound = std::find_if(
773  trig_paths_.begin(),
774  trig_paths_.end(),
775  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
776  if (itFound != trig_paths_.end())
777  found = true;
778  }
779  if (found) {
780  descriptions.reserve(itFound->size());
781  for (size_t i = 0; i < itFound->size(); ++i) {
782  descriptions.push_back(itFound->getWorker(i)->description());
783  }
784  }
785  }
std::string const & name() const
Definition: Path.h:74
void edm::StreamSchedule::modulesInPath ( std::string const &  iPathLabel,
std::vector< std::string > &  oLabelsToFill 
) const

adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel

Definition at line 745 of file StreamSchedule.cc.

References mps_fire::i, edm::Path::name(), and trig_paths_.

745  {
746  TrigPaths::const_iterator itFound = std::find_if(
747  trig_paths_.begin(),
748  trig_paths_.end(),
749  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
750  if (itFound != trig_paths_.end()) {
751  oLabelsToFill.reserve(itFound->size());
752  for (size_t i = 0; i < itFound->size(); ++i) {
753  oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
754  }
755  }
756  }
std::string const & name() const
Definition: Path.h:74
unsigned int edm::StreamSchedule::numberOfUnscheduledModules ( ) const
inline

Definition at line 255 of file StreamSchedule.h.

References number_of_unscheduled_modules_.

unsigned int number_of_unscheduled_modules_
void edm::StreamSchedule::processOneEventAsync ( WaitingTaskHolder  iTask,
EventTransitionInfo info,
ServiceToken const &  token,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters 
)

Definition at line 544 of file StreamSchedule.cc.

References actReg_, CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, finishedPaths(), finishProcessOneEvent(), edm::WaitingTaskHolder::group(), info(), edm::ServiceWeakToken::lock(), edm::make_waiting_task(), eostools::move(), edm::hlt::Pass, pathStatusInserterWorkers_, edm::EventTransitionInfo::principal(), edm::WorkerManager::processAccumulatorsAsync(), resetAll(), results_, edm::WorkerManager::setupOnDemandSystem(), edm::WorkerManager::setupResolvers(), streamContext_, streamID_, total_events_, trig_paths_, and workerManager_.

548  {
549  EventPrincipal& ep = info.principal();
550 
551  // Caught exception is propagated via WaitingTaskHolder
552  CMS_SA_ALLOW try {
553  this->resetAll();
554 
555  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
556 
557  Traits::setStreamContext(streamContext_, ep);
558  //a service may want to communicate with another service
559  ServiceRegistry::Operate guard(serviceToken);
560  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
561 
562  HLTPathStatus hltPathStatus(hlt::Pass, 0);
563  for (int empty_trig_path : empty_trig_paths_) {
564  results_->at(empty_trig_path) = hltPathStatus;
565  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
566  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
567  ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
568  info, streamID_, ParentContext(&streamContext_), &streamContext_);
569  if (except) {
570  iTask.doneWaiting(except);
571  return;
572  }
573  }
574  for (int empty_end_path : empty_end_paths_) {
575  std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]
576  ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
577  info, streamID_, ParentContext(&streamContext_), &streamContext_);
578  if (except) {
579  iTask.doneWaiting(except);
580  return;
581  }
582  }
583 
586 
587  ++total_events_;
588 
589  //use to give priorities on an error to ones from Paths
590  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
591  auto pathErrorPtr = pathErrorHolder.get();
592  ServiceWeakToken weakToken = serviceToken;
593  auto allPathsDone = make_waiting_task(
594  [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
595  ServiceRegistry::Operate operate(weakToken.lock());
596 
597  std::exception_ptr ptr;
598  if (pathError->load()) {
599  ptr = *pathError->load();
600  delete pathError->load();
601  }
602  if ((not ptr) and iPtr) {
603  ptr = *iPtr;
604  }
605  iTask.doneWaiting(finishProcessOneEvent(ptr));
606  });
607  //The holder guarantees that if the paths finish before the loop ends
608  // that we do not start too soon. It also guarantees that the task will
609  // run under that condition.
610  WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
611 
612  auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
613  std::exception_ptr const* iPtr) mutable {
614  ServiceRegistry::Operate operate(weakToken.lock());
615 
616  if (iPtr) {
617  //this is used to prioritize this error over one
618  // that happens in EndPath or Accumulate
619  pathErrorPtr->store(new std::exception_ptr(*iPtr));
620  }
621  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
622  });
623 
624  //The holder guarantees that if the paths finish before the loop ends
625  // that we do not start too soon. It also guarantees that the task will
626  // run under that condition.
627  WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
628 
629  //start end paths first so on single threaded the paths will run first
630  WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
631  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
632  it->processOneOccurrenceAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
633  }
634 
635  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
636  it->processOneOccurrenceAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
637  }
638 
639  ParentContext parentContext(&streamContext_);
640  workerManager_.processAccumulatorsAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
641  hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
642  } catch (...) {
643  iTask.doneWaiting(std::current_exception());
644  }
645  }
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::vector< int > empty_trig_paths_
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void processAccumulatorsAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
void setupOnDemandSystem(EventTransitionInfo const &)
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
void finishedPaths(std::atomic< std::exception_ptr * > &, WaitingTaskHolder, EventTransitionInfo &)
std::vector< int > empty_end_paths_
accept
Definition: HLTenums.h:18
StreamContext streamContext_
def move
Definition: eostools.py:511
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
edm::propagate_const< TrigResPtr > results_
void setupResolvers(Principal &principal)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
template<typename T >
void edm::StreamSchedule::processOneStreamAsync ( WaitingTaskHolder  iTask,
typename T::TransitionInfoType &  transitionInfo,
ServiceToken const &  token,
bool  cleaningUpAfterException = false 
)

Definition at line 371 of file StreamSchedule.h.

References actReg_, edm::addContextAndPrintException(), CMS_SA_ALLOW, cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), end_paths_, edm::ExceptionFromThisContext, edm::WaitingTaskHolder::group(), h, gpuClustering::id, info(), edm::ServiceWeakToken::lock(), edm::make_functor_task(), edm::make_waiting_task(), AlCaHLTBitMon_ParallelJobs::p, edm::WorkerManager::processOneOccurrenceAsync(), edm::WorkerManager::resetAll(), alignCSCRings::s, streamContext_, streamID_, unpackBuffers-CaloStage2::token, trig_paths_, edm::StreamID::value(), workerManager_, and edm::convertException::wrap().

374  {
375  auto const& principal = transitionInfo.principal();
376  T::setStreamContext(streamContext_, principal);
377 
378  auto id = principal.id();
379  ServiceWeakToken weakToken = token;
380  auto doneTask = make_waiting_task(
381  [this, iHolder, id, cleaningUpAfterException, weakToken](std::exception_ptr const* iPtr) mutable {
382  std::exception_ptr excpt;
383  if (iPtr) {
384  excpt = *iPtr;
385  //add context information to the exception and print message
386  try {
387  convertException::wrap([&]() { std::rethrow_exception(excpt); });
388  } catch (cms::Exception& ex) {
389  //TODO: should add the transition type info
390  std::ostringstream ost;
391  if (ex.context().empty()) {
392  ost << "Processing " << T::transitionName() << " " << id;
393  }
394  ServiceRegistry::Operate op(weakToken.lock());
395  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
396  excpt = std::current_exception();
397  }
398 
399  ServiceRegistry::Operate op(weakToken.lock());
400  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
401  }
402  // Caught exception is propagated via WaitingTaskHolder
403  CMS_SA_ALLOW try {
404  ServiceRegistry::Operate op(weakToken.lock());
405  T::postScheduleSignal(actReg_.get(), &streamContext_);
406  } catch (...) {
407  if (not excpt) {
408  excpt = std::current_exception();
409  }
410  }
411  iHolder.doneWaiting(excpt);
412  });
413 
414  auto task = make_functor_task(
415  [this, h = WaitingTaskHolder(*iHolder.group(), doneTask), info = transitionInfo, weakToken]() mutable {
416  auto token = weakToken.lock();
418  // Caught exception is propagated via WaitingTaskHolder
419  CMS_SA_ALLOW try {
420  T::preScheduleSignal(actReg_.get(), &streamContext_);
421 
423  } catch (...) {
424  h.doneWaiting(std::current_exception());
425  return;
426  }
427 
428  for (auto& p : end_paths_) {
429  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
430  }
431 
432  for (auto& p : trig_paths_) {
433  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
434  }
435 
437  });
438 
439  if (streamID_.value() == 0) {
440  //Enqueueing will start another thread if there is only
441  // one thread in the job. Having stream == 0 use spawn
442  // avoids starting up another thread when there is only one stream.
443  iHolder.group()->run([task]() {
444  TaskSentry s{task};
445  task->execute();
446  });
447  } else {
448  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
449  arena.enqueue([task]() {
450  TaskSentry s{task};
451  task->execute();
452  });
453  }
454  }
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
uint16_t *__restrict__ id
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
void processOneOccurrenceAsync(WaitingTaskHolder, typename T::TransitionInfoType &, ServiceToken const &, StreamID, typename T::Context const *topContext, U const *context)
StreamContext streamContext_
std::list< std::string > const & context() const
Definition: Exception.cc:147
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
unsigned int value() const
Definition: StreamID.h:43
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
auto wrap(F iFunc) -> decltype(iFunc())
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
long double T
void edm::StreamSchedule::replaceModule ( maker::ModuleHolder iMod,
std::string const &  iLabel 
)

clone the type of module with label iLabel but configure with iPSet.

Definition at line 515 of file StreamSchedule.cc.

References allWorkers(), edm::Worker::beginStream(), newFWLiteAna::found, edm::maker::ModuleHolder::replaceModuleFor(), streamContext_, and streamID_.

515  {
516  Worker* found = nullptr;
517  for (auto const& worker : allWorkers()) {
518  if (worker->description()->moduleLabel() == iLabel) {
519  found = worker;
520  break;
521  }
522  }
523  if (nullptr == found) {
524  return;
525  }
526 
527  iMod->replaceModuleFor(found);
528  found->beginStream(streamID_, streamContext_);
529  }
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
StreamContext streamContext_
void edm::StreamSchedule::reportSkipped ( EventPrincipal const &  ep) const
inlineprivate

Definition at line 365 of file StreamSchedule.h.

References edm::EventID::event(), edm::EventPrincipal::id(), and edm::EventID::run().

365  {
366  Service<JobReport> reportSvc;
367  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
368  }
void edm::StreamSchedule::resetAll ( )
private

Definition at line 876 of file StreamSchedule.cc.

References results_, and skippingEvent_.

Referenced by processOneEventAsync().

876  {
877  skippingEvent_ = false;
878  results_->reset();
879  }
edm::propagate_const< TrigResPtr > results_
std::atomic< bool > skippingEvent_
void edm::StreamSchedule::resetEarlyDelete ( )
private

Definition at line 883 of file StreamSchedule.cc.

References submitPVResolutionJobs::count, earlyDeleteBranchToCount_, earlyDeleteHelpers_, and earlyDeleteHelperToBranchIndicies_.

Referenced by finishProcessOneEvent(), and initializeEarlyDelete().

883  {
884  //must be sure we have cleared the count first
885  for (auto& count : earlyDeleteBranchToCount_) {
886  count.count = 0;
887  }
888  //now reset based on how many helpers use that branch
890  ++(earlyDeleteBranchToCount_[index].count);
891  }
892  for (auto& helper : earlyDeleteHelpers_) {
893  helper.reset();
894  }
895  }
std::vector< BranchToCount > earlyDeleteBranchToCount_
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
TrigResConstPtr edm::StreamSchedule::results ( ) const
inlineprivate

Definition at line 319 of file StreamSchedule.h.

References edm::get_underlying_safe(), and results_.

Referenced by StreamSchedule().

319 { return get_underlying_safe(results_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_
TrigResPtr& edm::StreamSchedule::results ( )
inlineprivate

Definition at line 320 of file StreamSchedule.h.

References edm::get_underlying_safe(), and results_.

320 { return get_underlying_safe(results_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_
StreamID edm::StreamSchedule::streamID ( ) const
inline

Definition at line 198 of file StreamSchedule.h.

References streamID_.

198 { return streamID_; }
int edm::StreamSchedule::totalEvents ( ) const
inline

Return the number of events this StreamSchedule has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 225 of file StreamSchedule.h.

References total_events_.

Referenced by getTriggerReport(), and totalEventsFailed().

225 { return total_events_; }
int edm::StreamSchedule::totalEventsFailed ( ) const
inline

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 233 of file StreamSchedule.h.

References totalEvents(), and totalEventsPassed().

Referenced by getTriggerReport().

233 { return totalEvents() - totalEventsPassed(); }
int totalEvents() const
int totalEventsPassed() const
int edm::StreamSchedule::totalEventsPassed ( ) const
inline

Return the number of events which have been passed by one or more trigger paths.

Definition at line 229 of file StreamSchedule.h.

References total_passed_.

Referenced by getTriggerReport(), and totalEventsFailed().

229 { return total_passed_; }

Member Data Documentation

std::shared_ptr<ActivityRegistry> edm::StreamSchedule::actReg_
private
std::vector<BranchToCount> edm::StreamSchedule::earlyDeleteBranchToCount_
private

Definition at line 344 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

std::vector<EarlyDeleteHelper> edm::StreamSchedule::earlyDeleteHelpers_
private

Definition at line 354 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

std::vector<unsigned int> edm::StreamSchedule::earlyDeleteHelperToBranchIndicies_
private

Definition at line 351 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

std::vector<int> edm::StreamSchedule::empty_end_paths_
private

Definition at line 339 of file StreamSchedule.h.

Referenced by fillEndPath(), makePathStatusInserters(), and processOneEventAsync().

std::vector<int> edm::StreamSchedule::empty_trig_paths_
private

Definition at line 338 of file StreamSchedule.h.

Referenced by fillTrigPath(), makePathStatusInserters(), and processOneEventAsync().

TrigPaths edm::StreamSchedule::end_paths_
private
std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::endPathStatusInserterWorkers_
private

Definition at line 334 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

unsigned int edm::StreamSchedule::number_of_unscheduled_modules_
private

Definition at line 358 of file StreamSchedule.h.

Referenced by numberOfUnscheduledModules(), and StreamSchedule().

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::pathStatusInserterWorkers_
private

Definition at line 333 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

edm::propagate_const<TrigResPtr> edm::StreamSchedule::results_
private

Definition at line 330 of file StreamSchedule.h.

Referenced by finishedPaths(), processOneEventAsync(), resetAll(), and results().

edm::propagate_const<WorkerPtr> edm::StreamSchedule::results_inserter_
private

Definition at line 332 of file StreamSchedule.h.

Referenced by finishedPaths(), and StreamSchedule().

std::atomic<bool> edm::StreamSchedule::skippingEvent_
private

Definition at line 362 of file StreamSchedule.h.

Referenced by fillTrigPath(), and resetAll().

StreamContext edm::StreamSchedule::streamContext_
private
StreamID edm::StreamSchedule::streamID_
private
int edm::StreamSchedule::total_events_
private

Definition at line 356 of file StreamSchedule.h.

Referenced by clearCounters(), processOneEventAsync(), and totalEvents().

int edm::StreamSchedule::total_passed_
private

Definition at line 357 of file StreamSchedule.h.

Referenced by clearCounters(), finishedPaths(), and totalEventsPassed().

TrigPaths edm::StreamSchedule::trig_paths_
private
WorkerManager edm::StreamSchedule::workerManager_
private