CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
List of all members | Classes | Public Types | Public Member Functions | Private Member Functions | Private Attributes
edm::StreamSchedule Class Reference

#include <StreamSchedule.h>

Classes

class  SendTerminationSignalIfException
 

Public Types

typedef std::vector< Worker * > AllWorkers
 
typedef std::vector< PathNonTrigPaths
 
typedef std::vector< WorkerInPathPathWorkers
 
typedef std::vector< PathTrigPaths
 
typedef std::shared_ptr
< HLTGlobalStatus const > 
TrigResConstPtr
 
typedef std::shared_ptr
< HLTGlobalStatus
TrigResPtr
 
typedef std::vector< std::string > vstring
 
typedef std::shared_ptr< WorkerWorkerPtr
 
typedef std::vector< Worker * > Workers
 

Public Member Functions

AllWorkers const & allWorkers () const
 returns the collection of pointers to workers More...
 
void availablePaths (std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill the labels for all paths in the process More...
 
void beginStream ()
 
void clearCounters ()
 Clear all the counters in the trigger report. More...
 
StreamContext const & context () const
 
void deleteModule (std::string const &iLabel)
 Delete the module with label iLabel. More...
 
void endStream ()
 
std::vector< ModuleDescription
const * > 
getAllModuleDescriptions () const
 
void getTriggerReport (TriggerReport &rep) const
 
void moduleDescriptionsInEndPath (std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
 
void moduleDescriptionsInPath (std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
 
void modulesInPath (std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel More...
 
unsigned int numberOfUnscheduledModules () const
 
void processOneEventAsync (WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
 
template<typename T >
void processOneStreamAsync (WaitingTaskHolder iTask, typename T::TransitionInfoType &transitionInfo, ServiceToken const &token, bool cleaningUpAfterException=false)
 
void replaceModule (maker::ModuleHolder *iMod, std::string const &iLabel)
 clone the type of module with label iLabel but configure with iPSet. More...
 
StreamID streamID () const
 
 StreamSchedule (std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
 
 StreamSchedule (StreamSchedule const &)=delete
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 

Private Member Functions

ExceptionToActionTable const & actionTable () const
 returns the action table More...
 
void addToAllWorkers (Worker *w)
 
void fillEndPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
 
void fillTrigPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
 
void fillWorkers (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
 
void finishedPaths (std::atomic< std::exception_ptr * > &, WaitingTaskHolder, EventTransitionInfo &)
 
std::exception_ptr finishProcessOneEvent (std::exception_ptr)
 
void initializeEarlyDelete (ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
 
void makePathStatusInserters (std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
 
void reportSkipped (EventPrincipal const &ep) const
 
void resetAll ()
 
void resetEarlyDelete ()
 
TrigResConstPtr results () const
 
TrigResPtrresults ()
 

Private Attributes

std::shared_ptr< ActivityRegistryactReg_
 
std::vector< BranchToCountearlyDeleteBranchToCount_
 
std::vector< EarlyDeleteHelperearlyDeleteHelpers_
 
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
 
std::vector< int > empty_end_paths_
 
std::vector< int > empty_trig_paths_
 
TrigPaths end_paths_
 
std::vector
< edm::propagate_const
< WorkerPtr > > 
endPathStatusInserterWorkers_
 
unsigned int number_of_unscheduled_modules_
 
std::vector
< edm::propagate_const
< WorkerPtr > > 
pathStatusInserterWorkers_
 
edm::propagate_const< TrigResPtrresults_
 
edm::propagate_const< WorkerPtrresults_inserter_
 
std::atomic< bool > skippingEvent_
 
StreamContext streamContext_
 
StreamID streamID_
 
int total_events_
 
int total_passed_
 
TrigPaths trig_paths_
 
WorkerManager workerManager_
 

Detailed Description

Definition at line 152 of file StreamSchedule.h.

Member Typedef Documentation

typedef std::vector<Worker*> edm::StreamSchedule::AllWorkers

Definition at line 160 of file StreamSchedule.h.

Definition at line 156 of file StreamSchedule.h.

Definition at line 164 of file StreamSchedule.h.

typedef std::vector<Path> edm::StreamSchedule::TrigPaths

Definition at line 155 of file StreamSchedule.h.

typedef std::shared_ptr<HLTGlobalStatus const> edm::StreamSchedule::TrigResConstPtr

Definition at line 158 of file StreamSchedule.h.

Definition at line 157 of file StreamSchedule.h.

typedef std::vector<std::string> edm::StreamSchedule::vstring

Definition at line 154 of file StreamSchedule.h.

typedef std::shared_ptr<Worker> edm::StreamSchedule::WorkerPtr

Definition at line 159 of file StreamSchedule.h.

typedef std::vector<Worker*> edm::StreamSchedule::Workers

Definition at line 162 of file StreamSchedule.h.

Constructor & Destructor Documentation

edm::StreamSchedule::StreamSchedule ( std::shared_ptr< TriggerResultInserter inserter,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
std::shared_ptr< ModuleRegistry modReg,
ParameterSet proc_pset,
service::TriggerNamesService const &  tns,
PreallocationConfiguration const &  prealloc,
ProductRegistry pregistry,
BranchIDListHelper branchIDListHelper,
ExceptionToActionTable const &  actions,
std::shared_ptr< ActivityRegistry areg,
std::shared_ptr< ProcessConfiguration processConfiguration,
bool  allowEarlyDelete,
StreamID  streamID,
ProcessContext const *  processContext 
)

Definition at line 138 of file StreamSchedule.cc.

References actReg_, addToAllWorkers(), edm::WorkerManager::addToUnscheduledWorkers(), allWorkers(), cms::cuda::assert(), end_paths_, fillEndPath(), fillTrigPath(), edm::propagate_const< T >::get(), edm::service::TriggerNamesService::getEndPaths(), edm::ParameterSet::getParameter(), edm::ParameterSet::getPSetForUpdate(), edm::service::TriggerNamesService::getTrigPaths(), edm::ParameterSet::getUntrackedParameterSet(), initializeEarlyDelete(), label, makePathStatusInserters(), number_of_unscheduled_modules_, results(), results_inserter_, DBoxMetadataHelper::set_difference(), trig_paths_, edm::StreamID::value(), and workerManager_.

154  : workerManager_(modReg, areg, actions),
155  actReg_(areg),
156  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
158  trig_paths_(),
159  end_paths_(),
160  total_events_(),
161  total_passed_(),
164  streamContext_(streamID_, processContext),
165  skippingEvent_(false) {
166  ParameterSet const& opts = proc_pset.getUntrackedParameterSet("options", ParameterSet());
167  bool hasPath = false;
168  std::vector<std::string> const& pathNames = tns.getTrigPaths();
169  std::vector<std::string> const& endPathNames = tns.getEndPaths();
170 
171  int trig_bitpos = 0;
172  trig_paths_.reserve(pathNames.size());
173  for (auto const& trig_name : pathNames) {
174  fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name, results(), endPathNames);
175  ++trig_bitpos;
176  hasPath = true;
177  }
178 
179  if (hasPath) {
180  // the results inserter stands alone
181  inserter->setTrigResultForStream(streamID.value(), results());
182 
183  results_inserter_ = makeInserter(actions, actReg_, inserter);
185  }
186 
187  // fill normal endpaths
188  int bitpos = 0;
189  end_paths_.reserve(endPathNames.size());
190  for (auto const& end_path_name : endPathNames) {
191  fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames);
192  ++bitpos;
193  }
194 
195  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
196 
197  //See if all modules were used
198  std::set<std::string> usedWorkerLabels;
199  for (auto const& worker : allWorkers()) {
200  usedWorkerLabels.insert(worker->description()->moduleLabel());
201  }
202  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
203  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
204  std::vector<std::string> unusedLabels;
205  set_difference(modulesInConfigSet.begin(),
206  modulesInConfigSet.end(),
207  usedWorkerLabels.begin(),
208  usedWorkerLabels.end(),
209  back_inserter(unusedLabels));
210  std::set<std::string> unscheduledLabels;
211  std::vector<std::string> shouldBeUsedLabels;
212  if (!unusedLabels.empty()) {
213  //Need to
214  // 1) create worker
215  // 2) if it is a WorkerT<EDProducer>, add it to our list
216  // 3) hand list to our delayed reader
217  for (auto const& label : unusedLabels) {
218  bool isTracked;
219  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
220  assert(isTracked);
221  assert(modulePSet != nullptr);
223  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
224  }
225  if (!shouldBeUsedLabels.empty()) {
226  std::ostringstream unusedStream;
227  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
228  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
229  itLabelEnd = shouldBeUsedLabels.end();
230  itLabel != itLabelEnd;
231  ++itLabel) {
232  unusedStream << ",'" << *itLabel << "'";
233  }
234  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
235  }
236  }
237  number_of_unscheduled_modules_ = unscheduledLabels.size();
238 
239  initializeEarlyDelete(*modReg, opts, preg, allowEarlyDelete);
240 
241  } // StreamSchedule::StreamSchedule
pathNames_ & tns()), endPathNames_(&tns.getEndPaths()), wantSummary_(tns.wantSummary()
Definition: Schedule.cc:691
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
processConfiguration
Definition: Schedule.cc:687
void addToAllWorkers(Worker *w)
edm::propagate_const< WorkerPtr > results_inserter_
assert(be >=bs)
actions
Definition: Schedule.cc:687
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
char const * label
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
StreamID streamID() const
StreamContext streamContext_
areg
Definition: Schedule.cc:687
Log< level::Info, false > LogInfo
unsigned int value() const
Definition: StreamID.h:43
edm::propagate_const< TrigResPtr > results_
constexpr element_type const * get() const
std::atomic< bool > skippingEvent_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
preg
Definition: Schedule.cc:687
TrigResConstPtr results() const
std::vector< std::string > set_difference(std::vector< std::string > const &v1, std::vector< std::string > const &v2)
prealloc
Definition: Schedule.cc:687
edm::StreamSchedule::StreamSchedule ( StreamSchedule const &  )
delete

Member Function Documentation

ExceptionToActionTable const& edm::StreamSchedule::actionTable ( ) const
inlineprivate

returns the action table

Definition at line 278 of file StreamSchedule.h.

References edm::WorkerManager::actionTable(), and workerManager_.

Referenced by fillEndPath(), fillTrigPath(), and finishedPaths().

278 { return workerManager_.actionTable(); }
WorkerManager workerManager_
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:89
void edm::StreamSchedule::addToAllWorkers ( Worker w)
private

Definition at line 899 of file StreamSchedule.cc.

References edm::WorkerManager::addToAllWorkers(), and workerManager_.

Referenced by fillEndPath(), fillTrigPath(), makePathStatusInserters(), and StreamSchedule().

const double w
Definition: UKUtility.cc:23
WorkerManager workerManager_
void addToAllWorkers(Worker *w)
AllWorkers const& edm::StreamSchedule::allWorkers ( ) const
inline

returns the collection of pointers to workers

Definition at line 250 of file StreamSchedule.h.

References edm::WorkerManager::allWorkers(), and workerManager_.

Referenced by clearCounters(), getAllModuleDescriptions(), getTriggerReport(), initializeEarlyDelete(), replaceModule(), and StreamSchedule().

250 { return workerManager_.allWorkers(); }
WorkerManager workerManager_
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:85
void edm::StreamSchedule::availablePaths ( std::vector< std::string > &  oLabelsToFill) const

adds to oLabelsToFill the labels for all paths in the process

Definition at line 755 of file StreamSchedule.cc.

References edm::Path::name(), HcalDetIdTransform::transform(), and trig_paths_.

755  {
756  oLabelsToFill.reserve(trig_paths_.size());
757  std::transform(trig_paths_.begin(),
758  trig_paths_.end(),
759  std::back_inserter(oLabelsToFill),
760  std::bind(&Path::name, std::placeholders::_1));
761  }
std::string const & name() const
Definition: Path.h:74
unsigned transform(const HcalDetId &id, unsigned transformCode)
void edm::StreamSchedule::beginStream ( )

Definition at line 529 of file StreamSchedule.cc.

References edm::WorkerManager::beginStream(), streamContext_, streamID_, and workerManager_.

WorkerManager workerManager_
StreamContext streamContext_
void beginStream(StreamID iID, StreamContext &streamContext)
void edm::StreamSchedule::clearCounters ( )

Clear all the counters in the trigger report.

Definition at line 886 of file StreamSchedule.cc.

References allWorkers(), edm::Path::clearCounters(), edm::Worker::clearCounters(), end_paths_, edm::for_all(), total_events_, total_passed_, and trig_paths_.

886  {
887  using std::placeholders::_1;
889  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
890  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
891  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
892  }
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void clearCounters()
Definition: Worker.h:222
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
void clearCounters()
Definition: Path.cc:198
StreamContext const& edm::StreamSchedule::context ( ) const
inline

Definition at line 254 of file StreamSchedule.h.

References streamContext_.

254 { return streamContext_; }
StreamContext streamContext_
void edm::StreamSchedule::deleteModule ( std::string const &  iLabel)

Delete the module with label iLabel.

Definition at line 549 of file StreamSchedule.cc.

References edm::WorkerManager::deleteModuleIfExists(), and workerManager_.

void deleteModuleIfExists(std::string const &moduleLabel)
WorkerManager workerManager_
void edm::StreamSchedule::endStream ( )

Definition at line 531 of file StreamSchedule.cc.

References edm::WorkerManager::endStream(), streamContext_, streamID_, and workerManager_.

void endStream(StreamID iID, StreamContext &streamContext)
WorkerManager workerManager_
StreamContext streamContext_
void edm::StreamSchedule::fillEndPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
std::vector< std::string > const &  endPathNames 
)
private

Definition at line 500 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_end_paths_, end_paths_, fillWorkers(), edm::PathContext::kEndPath, and streamContext_.

Referenced by StreamSchedule().

506  {
507  PathWorkers tmpworkers;
508  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames);
509 
510  if (!tmpworkers.empty()) {
511  //EndPaths are not supposed to stop if SkipEvent type exception happens
512  end_paths_.emplace_back(bitpos,
513  name,
514  tmpworkers,
515  TrigResPtr(),
516  actionTable(),
517  actReg_,
519  nullptr,
521  } else {
522  empty_end_paths_.push_back(bitpos);
523  }
524  for (WorkerInPath const& workerInPath : tmpworkers) {
525  addToAllWorkers(workerInPath.getWorker());
526  }
527  }
std::shared_ptr< HLTGlobalStatus > TrigResPtr
processConfiguration
Definition: Schedule.cc:687
void addToAllWorkers(Worker *w)
ExceptionToActionTable const & actionTable() const
returns the action table
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
StreamContext streamContext_
preg
Definition: Schedule.cc:687
prealloc
Definition: Schedule.cc:687
void edm::StreamSchedule::fillTrigPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
TrigResPtr  trptr,
std::vector< std::string > const &  endPathNames 
)
private

Definition at line 470 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_trig_paths_, fillWorkers(), edm::PathContext::kPath, skippingEvent_, streamContext_, and trig_paths_.

Referenced by StreamSchedule().

477  {
478  PathWorkers tmpworkers;
479  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames);
480 
481  // an empty path will cause an extra bit that is not used
482  if (!tmpworkers.empty()) {
483  trig_paths_.emplace_back(bitpos,
484  name,
485  tmpworkers,
486  trptr,
487  actionTable(),
488  actReg_,
492  } else {
493  empty_trig_paths_.push_back(bitpos);
494  }
495  for (WorkerInPath const& workerInPath : tmpworkers) {
496  addToAllWorkers(workerInPath.getWorker());
497  }
498  }
std::vector< int > empty_trig_paths_
processConfiguration
Definition: Schedule.cc:687
void addToAllWorkers(Worker *w)
ExceptionToActionTable const & actionTable() const
returns the action table
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
StreamContext streamContext_
std::atomic< bool > skippingEvent_
preg
Definition: Schedule.cc:687
prealloc
Definition: Schedule.cc:687
void edm::StreamSchedule::fillWorkers ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
std::string const &  name,
bool  ignoreFilters,
PathWorkers out,
std::vector< std::string > const &  endPathNames 
)
private

Definition at line 399 of file StreamSchedule.cc.

References cms::cuda::assert(), edm::errors::Configuration, edm::Worker::description(), Exception, edm::ParameterSet::getParameter(), edm::ParameterSet::getPSetForUpdate(), edm::ParameterSet::getUntrackedParameter(), edm::WorkerManager::getWorker(), edm::WorkerInPath::Ignore, edm::Worker::kFilter, edm::ModuleDescription::moduleName(), edm::Worker::moduleType(), mergeVDriftHistosByStation::name, edm::WorkerInPath::Normal, or, EgammaValidation_cff::pathName, edm::search_all(), AlCaHLTBitMon_QueryRunRegistry::string, edm::WorkerInPath::Veto, and workerManager_.

Referenced by fillEndPath(), and fillTrigPath().

406  {
407  vstring modnames = proc_pset.getParameter<vstring>(pathName);
408  PathWorkers tmpworkers;
409 
410  unsigned int placeInPath = 0;
411  for (auto const& name : modnames) {
412  //Modules except EDFilters are set to run concurrently by default
413  bool doNotRunConcurrently = false;
415  if (name[0] == '!') {
416  filterAction = WorkerInPath::Veto;
417  } else if (name[0] == '-' or name[0] == '+') {
418  filterAction = WorkerInPath::Ignore;
419  }
420  if (name[0] == '|' or name[0] == '+') {
421  //cms.wait was specified so do not run concurrently
422  doNotRunConcurrently = true;
423  }
424 
425  std::string moduleLabel = name;
426  if (filterAction != WorkerInPath::Normal or name[0] == '|') {
427  moduleLabel.erase(0, 1);
428  }
429 
430  bool isTracked;
431  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
432  if (modpset == nullptr) {
433  std::string pathType("endpath");
434  if (!search_all(endPathNames, pathName)) {
435  pathType = std::string("path");
436  }
438  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
439  << "\"\n please check spelling or remove that label from the path.";
440  }
441  assert(isTracked);
442 
443  Worker* worker = workerManager_.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
444  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
445  // We have a filter on an end path, and the filter is not explicitly ignored.
446  // See if the filter is allowed.
447  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
448  if (!search_all(allowed_filters, worker->description()->moduleName())) {
449  // Filter is not allowed. Ignore the result, and issue a warning.
450  filterAction = WorkerInPath::Ignore;
451  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description()->moduleName()
452  << "' with module label '" << moduleLabel << "' appears on EndPath '"
453  << pathName << "'.\n"
454  << "The return value of the filter will be ignored.\n"
455  << "To suppress this warning, either remove the filter from the endpath,\n"
456  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
457  }
458  }
459  bool runConcurrently = not doNotRunConcurrently;
460  if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
461  runConcurrently = false;
462  }
463  tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
464  ++placeInPath;
465  }
466 
467  out.swap(tmpworkers);
468  }
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
vector< string > vstring
Definition: ExoticaDQM.cc:8
processConfiguration
Definition: Schedule.cc:687
assert(be >=bs)
WorkerManager workerManager_
std::vector< WorkerInPath > PathWorkers
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
Log< level::Warning, false > LogWarning
preg
Definition: Schedule.cc:687
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
prealloc
Definition: Schedule.cc:687
void edm::StreamSchedule::finishedPaths ( std::atomic< std::exception_ptr * > &  iExcept,
WaitingTaskHolder  iWait,
EventTransitionInfo info 
)
private

Definition at line 665 of file StreamSchedule.cc.

References mps_fire::action, actionTable(), cms::Exception::addContext(), cms::cuda::assert(), cms::Exception::category(), CMS_SA_ALLOW, cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), alignCSCRings::e, edm::exception_actions::FailPath, edm::ExceptionToActionTable::find(), edm::propagate_const< T >::get(), edm::EventPrincipal::id(), edm::exception_actions::IgnoreCompletely, info(), edm::EventTransitionInfo::principal(), edm::printCmsExceptionWarning(), results_, results_inserter_, edm::exception_actions::SkipEvent, streamContext_, streamID_, and total_passed_.

Referenced by processOneEventAsync().

667  {
668  if (iExcept) {
669  // Caught exception is propagated via WaitingTaskHolder
670  CMS_SA_ALLOW try { std::rethrow_exception(*(iExcept.load())); } catch (cms::Exception& e) {
674  if (action == exception_actions::SkipEvent) {
675  edm::printCmsExceptionWarning("SkipEvent", e);
676  *(iExcept.load()) = std::exception_ptr();
677  } else {
678  *(iExcept.load()) = std::current_exception();
679  }
680  } catch (...) {
681  *(iExcept.load()) = std::current_exception();
682  }
683  }
684 
685  if ((not iExcept) and results_->accept()) {
686  ++total_passed_;
687  }
688 
689  if (nullptr != results_inserter_.get()) {
690  // Caught exception is propagated to the caller
691  CMS_SA_ALLOW try {
692  //Even if there was an exception, we need to allow results inserter
693  // to run since some module may be waiting on its results.
694  ParentContext parentContext(&streamContext_);
695  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
696 
697  auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
698  if (expt) {
699  std::rethrow_exception(expt);
700  }
701  } catch (cms::Exception& ex) {
702  if (not iExcept) {
703  if (ex.context().empty()) {
704  std::ostringstream ost;
705  ost << "Processing Event " << info.principal().id();
706  ex.addContext(ost.str());
707  }
708  iExcept.store(new std::exception_ptr(std::current_exception()));
709  }
710  } catch (...) {
711  if (not iExcept) {
712  iExcept.store(new std::exception_ptr(std::current_exception()));
713  }
714  }
715  }
716  std::exception_ptr ptr;
717  if (iExcept) {
718  ptr = *iExcept.load();
719  }
720  iWait.doneWaiting(ptr);
721  }
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::string const & category() const
Definition: Exception.cc:143
exception_actions::ActionCodes find(const std::string &category) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
assert(be >=bs)
StreamContext streamContext_
std::list< std::string > const & context() const
Definition: Exception.cc:147
edm::propagate_const< TrigResPtr > results_
void addContext(std::string const &context)
Definition: Exception.cc:165
string action
Definition: mps_fire.py:183
constexpr element_type const * get() const
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
std::exception_ptr edm::StreamSchedule::finishProcessOneEvent ( std::exception_ptr  iExcept)
private

Definition at line 723 of file StreamSchedule.cc.

References actReg_, edm::addContextAndPrintException(), CMS_SA_ALLOW, cms::Exception::context(), edm::ExceptionFromThisContext, resetEarlyDelete(), streamContext_, and edm::convertException::wrap().

Referenced by processOneEventAsync().

723  {
724  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
725 
726  if (iExcept) {
727  //add context information to the exception and print message
728  try {
729  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
730  } catch (cms::Exception& ex) {
731  bool const cleaningUpAfterException = false;
732  if (ex.context().empty()) {
733  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
734  } else {
735  addContextAndPrintException("", ex, cleaningUpAfterException);
736  }
737  iExcept = std::current_exception();
738  }
739 
740  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
741  }
742  // Caught exception is propagated to the caller
743  CMS_SA_ALLOW try { Traits::postScheduleSignal(actReg_.get(), &streamContext_); } catch (...) {
744  if (not iExcept) {
745  iExcept = std::current_exception();
746  }
747  }
748  if (not iExcept) {
750  }
751 
752  return iExcept;
753  }
#define CMS_SA_ALLOW
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::shared_ptr< ActivityRegistry > actReg_
StreamContext streamContext_
std::list< std::string > const & context() const
Definition: Exception.cc:147
auto wrap(F iFunc) -> decltype(iFunc())
std::vector< ModuleDescription const * > edm::StreamSchedule::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this StreamSchedule. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 551 of file StreamSchedule.cc.

References allWorkers(), AlCaHLTBitMon_ParallelJobs::p, mps_fire::result, and findQualityFiles::size.

551  {
552  std::vector<ModuleDescription const*> result;
553  result.reserve(allWorkers().size());
554 
555  for (auto const& worker : allWorkers()) {
556  ModuleDescription const* p = worker->description();
557  result.push_back(p);
558  }
559  return result;
560  }
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
tuple result
Definition: mps_fire.py:311
tuple size
Write out results.
void edm::StreamSchedule::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 876 of file StreamSchedule.cc.

References allWorkers(), end_paths_, edm::TriggerReport::endPathSummaries, edm::TriggerReport::eventSummary, edm::fillPathSummary(), edm::fillWorkerSummary(), edm::EventSummary::totalEvents, totalEvents(), edm::EventSummary::totalEventsFailed, totalEventsFailed(), edm::EventSummary::totalEventsPassed, totalEventsPassed(), trig_paths_, edm::TriggerReport::trigPathSummaries, and edm::TriggerReport::workerSummaries.

876  {
877  rep.eventSummary.totalEvents += totalEvents();
878  rep.eventSummary.totalEventsPassed += totalEventsPassed();
879  rep.eventSummary.totalEventsFailed += totalEventsFailed();
880 
881  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
882  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
883  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
884  }
string rep
Definition: cuy.py:1189
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
int totalEventsFailed() const
int totalEvents() const
int totalEventsPassed() const
static void fillPathSummary(Path const &path, PathSummary &sum)
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
void edm::StreamSchedule::initializeEarlyDelete ( ModuleRegistry modReg,
edm::ParameterSet const &  opts,
edm::ProductRegistry const &  preg,
bool  allowEarlyDelete 
)
private

Definition at line 243 of file StreamSchedule.cc.

References allWorkers(), edm::BranchDescription::branchName(), edm::maker::ModuleHolder::createOutputModuleCommunicator(), CommonMethods::delta(), submitPVResolutionJobs::desc, earlyDeleteBranchToCount_, earlyDeleteHelpers_, earlyDeleteHelperToBranchIndicies_, end_paths_, edm::ModuleRegistry::forAllModuleHolders(), newFWLiteAna::found, edm::pset::Registry::getMapped(), edm::InEvent, edm::pset::Registry::instance(), B2GTnPMonitor_cfi::item, cmsLHEtoEOSManager::l, dqmiodumpmetadata::n, AlCaHLTBitMon_ParallelJobs::p, TrackValidation_cff::pset, resetEarlyDelete(), AlCaHLTBitMon_QueryRunRegistry::string, trig_paths_, and w.

Referenced by StreamSchedule().

246  {
247  //for now, if have a subProcess, don't allow early delete
248  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
249  if (not allowEarlyDelete)
250  return;
251 
252  //see if 'canDeleteEarly' was set and if so setup the list with those products actually
253  // registered for this job
254  std::multimap<std::string, Worker*> branchToReadingWorker;
255  initializeBranchToReadingWorker(opts, preg, branchToReadingWorker);
256 
257  //If no delete early items have been specified we don't have to do anything
258  if (branchToReadingWorker.empty()) {
259  return;
260  }
261  const std::vector<std::string> kEmpty;
262  std::map<Worker*, unsigned int> reserveSizeForWorker;
263  unsigned int upperLimitOnReadingWorker = 0;
264  unsigned int upperLimitOnIndicies = 0;
265  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
266 
267  //talk with output modules first
268  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
269  auto comm = iHolder->createOutputModuleCommunicator();
270  if (comm) {
271  if (!branchToReadingWorker.empty()) {
272  //If an OutputModule needs a product, we can't delete it early
273  // so we should remove it from our list
274  SelectedProductsForBranchType const& kept = comm->keptProducts();
275  for (auto const& item : kept[InEvent]) {
276  BranchDescription const& desc = *item.first;
277  auto found = branchToReadingWorker.equal_range(desc.branchName());
278  if (found.first != found.second) {
279  --nUniqueBranchesToDelete;
280  branchToReadingWorker.erase(found.first, found.second);
281  }
282  }
283  }
284  }
285  });
286 
287  if (branchToReadingWorker.empty()) {
288  return;
289  }
290 
291  for (auto w : allWorkers()) {
292  //determine if this module could read a branch we want to delete early
293  auto pset = pset::Registry::instance()->getMapped(w->description()->parameterSetID());
294  if (nullptr != pset) {
295  auto branches = pset->getUntrackedParameter<std::vector<std::string>>("mightGet", kEmpty);
296  if (not branches.empty()) {
297  ++upperLimitOnReadingWorker;
298  }
299  for (auto const& branch : branches) {
300  auto found = branchToReadingWorker.equal_range(branch);
301  if (found.first != found.second) {
302  ++upperLimitOnIndicies;
303  ++reserveSizeForWorker[w];
304  if (nullptr == found.first->second) {
305  found.first->second = w;
306  } else {
307  branchToReadingWorker.insert(make_pair(found.first->first, w));
308  }
309  }
310  }
311  }
312  }
313  {
314  auto it = branchToReadingWorker.begin();
315  std::vector<std::string> unusedBranches;
316  while (it != branchToReadingWorker.end()) {
317  if (it->second == nullptr) {
318  unusedBranches.push_back(it->first);
319  //erasing the object invalidates the iterator so must advance it first
320  auto temp = it;
321  ++it;
322  branchToReadingWorker.erase(temp);
323  } else {
324  ++it;
325  }
326  }
327  if (not unusedBranches.empty()) {
328  LogWarning l("UnusedProductsForCanDeleteEarly");
329  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
330  " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
331  for (auto const& n : unusedBranches) {
332  l << "\n " << n;
333  }
334  }
335  }
336  if (!branchToReadingWorker.empty()) {
337  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
338  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
339  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
340  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
341  std::string lastBranchName;
342  size_t nextOpenIndex = 0;
343  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
344  for (auto& branchAndWorker : branchToReadingWorker) {
345  if (lastBranchName != branchAndWorker.first) {
346  //have to put back the period we removed earlier in order to get the proper name
347  BranchID bid(branchAndWorker.first + ".");
348  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
349  lastBranchName = branchAndWorker.first;
350  }
351  auto found = alreadySeenWorkers.find(branchAndWorker.second);
352  if (alreadySeenWorkers.end() == found) {
353  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
354  // all the branches that might be read by this worker. However, initially we will only tell the
355  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
356  // EarlyDeleteHelper will automatically advance its internal end pointer.
357  size_t index = nextOpenIndex;
358  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
360  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
361  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
362  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
363  nextOpenIndex += nIndices;
364  } else {
365  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
366  }
367  }
368 
369  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
370  // space needed for each module
371  auto itLast = earlyDeleteHelpers_.begin();
372  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
373  if (itLast->end() != it->begin()) {
374  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
375  unsigned int delta = it->begin() - itLast->end();
376  it->shiftIndexPointers(delta);
377 
379  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
380  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
381  }
382  itLast = it;
383  }
385  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
387 
388  //now tell the paths about the deleters
389  for (auto& p : trig_paths_) {
390  p.setEarlyDeleteHelpers(alreadySeenWorkers);
391  }
392  for (auto& p : end_paths_) {
393  p.setEarlyDeleteHelpers(alreadySeenWorkers);
394  }
396  }
397  }
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
const double w
Definition: UKUtility.cc:23
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
std::vector< BranchToCount > earlyDeleteBranchToCount_
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:17
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
Log< level::Warning, false > LogWarning
preg
Definition: Schedule.cc:687
static Registry * instance()
Definition: Registry.cc:12
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void edm::StreamSchedule::makePathStatusInserters ( std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
ExceptionToActionTable const &  actions 
)
private

Definition at line 915 of file StreamSchedule.cc.

References edm::actions, actReg_, addToAllWorkers(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, edm::get_underlying(), pathStatusInserterWorkers_, and trig_paths_.

Referenced by StreamSchedule().

918  {
919  int bitpos = 0;
920  unsigned int indexEmpty = 0;
921  unsigned int indexOfPath = 0;
922  for (auto& pathStatusInserter : pathStatusInserters) {
923  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
924  WorkerPtr workerPtr(
925  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
926  pathStatusInserterWorkers_.emplace_back(workerPtr);
927  workerPtr->setActivityRegistry(actReg_);
928  addToAllWorkers(workerPtr.get());
929 
930  // A little complexity here because a C++ Path object is not
931  // instantiated and put into end_paths if there are no modules
932  // on the configured path.
933  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
934  ++indexEmpty;
935  } else {
936  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
937  ++indexOfPath;
938  }
939  ++bitpos;
940  }
941 
942  bitpos = 0;
943  indexEmpty = 0;
944  indexOfPath = 0;
945  for (auto& endPathStatusInserter : endPathStatusInserters) {
946  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
947  WorkerPtr workerPtr(
948  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
949  endPathStatusInserterWorkers_.emplace_back(workerPtr);
950  workerPtr->setActivityRegistry(actReg_);
951  addToAllWorkers(workerPtr.get());
952 
953  // A little complexity here because a C++ Path object is not
954  // instantiated and put into end_paths if there are no modules
955  // on the configured path.
956  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
957  ++indexEmpty;
958  } else {
959  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
960  ++indexOfPath;
961  }
962  ++bitpos;
963  }
964  }
std::vector< int > empty_trig_paths_
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void addToAllWorkers(Worker *w)
std::shared_ptr< Worker > WorkerPtr
actions
Definition: Schedule.cc:687
std::shared_ptr< ActivityRegistry > actReg_
std::vector< int > empty_end_paths_
constexpr T & get_underlying(propagate_const< T > &)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void edm::StreamSchedule::moduleDescriptionsInEndPath ( std::string const &  iEndPathLabel,
std::vector< ModuleDescription const * > &  descriptions,
unsigned int  hint 
) const

Definition at line 805 of file StreamSchedule.cc.

References end_paths_, newFWLiteAna::found, mps_fire::i, and edm::Path::name().

807  {
808  descriptions.clear();
809  bool found = false;
810  TrigPaths::const_iterator itFound;
811 
812  if (hint < end_paths_.size()) {
813  itFound = end_paths_.begin() + hint;
814  if (itFound->name() == iEndPathLabel)
815  found = true;
816  }
817  if (!found) {
818  // if the hint did not work, do it the slow way
819  itFound = std::find_if(
820  end_paths_.begin(),
821  end_paths_.end(),
822  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
823  if (itFound != end_paths_.end())
824  found = true;
825  }
826  if (found) {
827  descriptions.reserve(itFound->size());
828  for (size_t i = 0; i < itFound->size(); ++i) {
829  descriptions.push_back(itFound->getWorker(i)->description());
830  }
831  }
832  }
std::string const & name() const
Definition: Path.h:74
void edm::StreamSchedule::moduleDescriptionsInPath ( std::string const &  iPathLabel,
std::vector< ModuleDescription const * > &  descriptions,
unsigned int  hint 
) const

Definition at line 776 of file StreamSchedule.cc.

References newFWLiteAna::found, mps_fire::i, edm::Path::name(), and trig_paths_.

778  {
779  descriptions.clear();
780  bool found = false;
781  TrigPaths::const_iterator itFound;
782 
783  if (hint < trig_paths_.size()) {
784  itFound = trig_paths_.begin() + hint;
785  if (itFound->name() == iPathLabel)
786  found = true;
787  }
788  if (!found) {
789  // if the hint did not work, do it the slow way
790  itFound = std::find_if(
791  trig_paths_.begin(),
792  trig_paths_.end(),
793  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
794  if (itFound != trig_paths_.end())
795  found = true;
796  }
797  if (found) {
798  descriptions.reserve(itFound->size());
799  for (size_t i = 0; i < itFound->size(); ++i) {
800  descriptions.push_back(itFound->getWorker(i)->description());
801  }
802  }
803  }
std::string const & name() const
Definition: Path.h:74
void edm::StreamSchedule::modulesInPath ( std::string const &  iPathLabel,
std::vector< std::string > &  oLabelsToFill 
) const

adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel

Definition at line 763 of file StreamSchedule.cc.

References mps_fire::i, edm::Path::name(), and trig_paths_.

763  {
764  TrigPaths::const_iterator itFound = std::find_if(
765  trig_paths_.begin(),
766  trig_paths_.end(),
767  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
768  if (itFound != trig_paths_.end()) {
769  oLabelsToFill.reserve(itFound->size());
770  for (size_t i = 0; i < itFound->size(); ++i) {
771  oLabelsToFill.push_back(itFound->getWorker(i)->description()->moduleLabel());
772  }
773  }
774  }
std::string const & name() const
Definition: Path.h:74
unsigned int edm::StreamSchedule::numberOfUnscheduledModules ( ) const
inline

Definition at line 252 of file StreamSchedule.h.

References number_of_unscheduled_modules_.

unsigned int number_of_unscheduled_modules_
void edm::StreamSchedule::processOneEventAsync ( WaitingTaskHolder  iTask,
EventTransitionInfo info,
ServiceToken const &  token,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters 
)

Definition at line 562 of file StreamSchedule.cc.

References actReg_, CMS_SA_ALLOW, edm::WaitingTaskHolder::doneWaiting(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, finishedPaths(), finishProcessOneEvent(), edm::WaitingTaskHolder::group(), info(), edm::ServiceWeakToken::lock(), edm::make_waiting_task(), eostools::move(), edm::hlt::Pass, pathStatusInserterWorkers_, edm::EventTransitionInfo::principal(), edm::WorkerManager::processAccumulatorsAsync(), resetAll(), results_, edm::WorkerManager::setupOnDemandSystem(), edm::WorkerManager::setupResolvers(), streamContext_, streamID_, total_events_, trig_paths_, and workerManager_.

566  {
567  EventPrincipal& ep = info.principal();
568 
569  // Caught exception is propagated via WaitingTaskHolder
570  CMS_SA_ALLOW try {
571  this->resetAll();
572 
573  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
574 
575  Traits::setStreamContext(streamContext_, ep);
576  //a service may want to communicate with another service
577  ServiceRegistry::Operate guard(serviceToken);
578  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
579 
580  HLTPathStatus hltPathStatus(hlt::Pass, 0);
581  for (int empty_trig_path : empty_trig_paths_) {
582  results_->at(empty_trig_path) = hltPathStatus;
583  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
584  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
585  ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
586  info, streamID_, ParentContext(&streamContext_), &streamContext_);
587  if (except) {
588  iTask.doneWaiting(except);
589  return;
590  }
591  }
592  for (int empty_end_path : empty_end_paths_) {
593  std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]
594  ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
595  info, streamID_, ParentContext(&streamContext_), &streamContext_);
596  if (except) {
597  iTask.doneWaiting(except);
598  return;
599  }
600  }
601 
604 
605  ++total_events_;
606 
607  //use to give priorities on an error to ones from Paths
608  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
609  auto pathErrorPtr = pathErrorHolder.get();
610  ServiceWeakToken weakToken = serviceToken;
611  auto allPathsDone = make_waiting_task(
612  [iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
613  ServiceRegistry::Operate operate(weakToken.lock());
614 
615  std::exception_ptr ptr;
616  if (pathError->load()) {
617  ptr = *pathError->load();
618  delete pathError->load();
619  }
620  if ((not ptr) and iPtr) {
621  ptr = *iPtr;
622  }
623  iTask.doneWaiting(finishProcessOneEvent(ptr));
624  });
625  //The holder guarantees that if the paths finish before the loop ends
626  // that we do not start too soon. It also guarantees that the task will
627  // run under that condition.
628  WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);
629 
630  auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
631  std::exception_ptr const* iPtr) mutable {
632  ServiceRegistry::Operate operate(weakToken.lock());
633 
634  if (iPtr) {
635  //this is used to prioritize this error over one
636  // that happens in EndPath or Accumulate
637  pathErrorPtr->store(new std::exception_ptr(*iPtr));
638  }
639  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), transitionInfo);
640  });
641 
642  //The holder guarantees that if the paths finish before the loop ends
643  // that we do not start too soon. It also guarantees that the task will
644  // run under that condition.
645  WaitingTaskHolder taskHolder(*iTask.group(), pathsDone);
646 
647  //start end paths first so on single threaded the paths will run first
648  WaitingTaskHolder hAllPathsDone(*iTask.group(), allPathsDone);
649  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
650  it->processOneOccurrenceAsync(hAllPathsDone, info, serviceToken, streamID_, &streamContext_);
651  }
652 
653  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
654  it->processOneOccurrenceAsync(taskHolder, info, serviceToken, streamID_, &streamContext_);
655  }
656 
657  ParentContext parentContext(&streamContext_);
658  workerManager_.processAccumulatorsAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
659  hAllPathsDone, info, serviceToken, streamID_, parentContext, &streamContext_);
660  } catch (...) {
661  iTask.doneWaiting(std::current_exception());
662  }
663  }
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::vector< int > empty_trig_paths_
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void processAccumulatorsAsync(WaitingTaskHolder, typename T::TransitionInfoType const &, ServiceToken const &, StreamID, ParentContext const &, typename T::Context const *)
void setupOnDemandSystem(EventTransitionInfo const &)
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
void finishedPaths(std::atomic< std::exception_ptr * > &, WaitingTaskHolder, EventTransitionInfo &)
std::vector< int > empty_end_paths_
accept
Definition: HLTenums.h:18
StreamContext streamContext_
def move
Definition: eostools.py:511
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
edm::propagate_const< TrigResPtr > results_
void setupResolvers(Principal &principal)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
template<typename T >
void edm::StreamSchedule::processOneStreamAsync ( WaitingTaskHolder  iTask,
typename T::TransitionInfoType &  transitionInfo,
ServiceToken const &  token,
bool  cleaningUpAfterException = false 
)

Definition at line 372 of file StreamSchedule.h.

References actReg_, edm::addContextAndPrintException(), CMS_SA_ALLOW, cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), end_paths_, edm::ExceptionFromThisContext, edm::WaitingTaskHolder::group(), h, gpuClustering::id, info(), edm::ServiceWeakToken::lock(), edm::make_functor_task(), edm::make_waiting_task(), AlCaHLTBitMon_ParallelJobs::p, edm::WorkerManager::processOneOccurrenceAsync(), edm::WorkerManager::resetAll(), alignCSCRings::s, streamContext_, streamID_, unpackBuffers-CaloStage2::token, trig_paths_, edm::StreamID::value(), workerManager_, and edm::convertException::wrap().

375  {
376  auto const& principal = transitionInfo.principal();
377  T::setStreamContext(streamContext_, principal);
378 
379  auto id = principal.id();
380  ServiceWeakToken weakToken = token;
381  auto doneTask = make_waiting_task(
382  [this, iHolder, id, cleaningUpAfterException, weakToken](std::exception_ptr const* iPtr) mutable {
383  std::exception_ptr excpt;
384  if (iPtr) {
385  excpt = *iPtr;
386  //add context information to the exception and print message
387  try {
388  convertException::wrap([&]() { std::rethrow_exception(excpt); });
389  } catch (cms::Exception& ex) {
390  //TODO: should add the transition type info
391  std::ostringstream ost;
392  if (ex.context().empty()) {
393  ost << "Processing " << T::transitionName() << " " << id;
394  }
395  ServiceRegistry::Operate op(weakToken.lock());
396  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
397  excpt = std::current_exception();
398  }
399 
400  ServiceRegistry::Operate op(weakToken.lock());
401  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
402  }
403  // Caught exception is propagated via WaitingTaskHolder
404  CMS_SA_ALLOW try {
405  ServiceRegistry::Operate op(weakToken.lock());
406  T::postScheduleSignal(actReg_.get(), &streamContext_);
407  } catch (...) {
408  if (not excpt) {
409  excpt = std::current_exception();
410  }
411  }
412  iHolder.doneWaiting(excpt);
413  });
414 
415  auto task = make_functor_task(
416  [this, h = WaitingTaskHolder(*iHolder.group(), doneTask), info = transitionInfo, weakToken]() mutable {
417  auto token = weakToken.lock();
419  // Caught exception is propagated via WaitingTaskHolder
420  CMS_SA_ALLOW try {
421  T::preScheduleSignal(actReg_.get(), &streamContext_);
422 
424  } catch (...) {
425  h.doneWaiting(std::current_exception());
426  return;
427  }
428 
429  for (auto& p : end_paths_) {
430  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
431  }
432 
433  for (auto& p : trig_paths_) {
434  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
435  }
436 
438  });
439 
440  if (streamID_.value() == 0) {
441  //Enqueueing will start another thread if there is only
442  // one thread in the job. Having stream == 0 use spawn
443  // avoids starting up another thread when there is only one stream.
444  iHolder.group()->run([task]() {
445  TaskSentry s{task};
446  task->execute();
447  });
448  } else {
449  tbb::task_arena arena{tbb::task_arena::attach()};
450  arena.enqueue([task]() {
451  TaskSentry s{task};
452  task->execute();
453  });
454  }
455  }
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
uint16_t *__restrict__ id
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
void processOneOccurrenceAsync(WaitingTaskHolder, typename T::TransitionInfoType &, ServiceToken const &, StreamID, typename T::Context const *topContext, U const *context)
StreamContext streamContext_
std::list< std::string > const & context() const
Definition: Exception.cc:147
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
unsigned int value() const
Definition: StreamID.h:43
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
auto wrap(F iFunc) -> decltype(iFunc())
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
long double T
void edm::StreamSchedule::replaceModule ( maker::ModuleHolder iMod,
std::string const &  iLabel 
)

clone the type of module with label iLabel but configure with iPSet.

Definition at line 533 of file StreamSchedule.cc.

References allWorkers(), edm::Worker::beginStream(), newFWLiteAna::found, edm::maker::ModuleHolder::replaceModuleFor(), streamContext_, and streamID_.

533  {
534  Worker* found = nullptr;
535  for (auto const& worker : allWorkers()) {
536  if (worker->description()->moduleLabel() == iLabel) {
537  found = worker;
538  break;
539  }
540  }
541  if (nullptr == found) {
542  return;
543  }
544 
545  iMod->replaceModuleFor(found);
546  found->beginStream(streamID_, streamContext_);
547  }
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
StreamContext streamContext_
void edm::StreamSchedule::reportSkipped ( EventPrincipal const &  ep) const
inlineprivate

Definition at line 366 of file StreamSchedule.h.

References edm::EventID::event(), edm::EventPrincipal::id(), and edm::EventID::run().

366  {
367  Service<JobReport> reportSvc;
368  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
369  }
void edm::StreamSchedule::resetAll ( )
private

Definition at line 894 of file StreamSchedule.cc.

References results_, and skippingEvent_.

Referenced by processOneEventAsync().

894  {
895  skippingEvent_ = false;
896  results_->reset();
897  }
edm::propagate_const< TrigResPtr > results_
std::atomic< bool > skippingEvent_
void edm::StreamSchedule::resetEarlyDelete ( )
private

Definition at line 901 of file StreamSchedule.cc.

References submitPVResolutionJobs::count, earlyDeleteBranchToCount_, earlyDeleteHelpers_, and earlyDeleteHelperToBranchIndicies_.

Referenced by finishProcessOneEvent(), and initializeEarlyDelete().

901  {
902  //must be sure we have cleared the count first
903  for (auto& count : earlyDeleteBranchToCount_) {
904  count.count = 0;
905  }
906  //now reset based on how many helpers use that branch
908  ++(earlyDeleteBranchToCount_[index].count);
909  }
910  for (auto& helper : earlyDeleteHelpers_) {
911  helper.reset();
912  }
913  }
std::vector< BranchToCount > earlyDeleteBranchToCount_
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
TrigResConstPtr edm::StreamSchedule::results ( ) const
inlineprivate

Definition at line 320 of file StreamSchedule.h.

References edm::get_underlying_safe(), and results_.

Referenced by StreamSchedule().

320 { return get_underlying_safe(results_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_
TrigResPtr& edm::StreamSchedule::results ( )
inlineprivate

Definition at line 321 of file StreamSchedule.h.

References edm::get_underlying_safe(), and results_.

321 { return get_underlying_safe(results_); }
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_
StreamID edm::StreamSchedule::streamID ( ) const
inline

Definition at line 199 of file StreamSchedule.h.

References streamID_.

199 { return streamID_; }
int edm::StreamSchedule::totalEvents ( ) const
inline

Return the number of events this StreamSchedule has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 226 of file StreamSchedule.h.

References total_events_.

Referenced by getTriggerReport(), and totalEventsFailed().

226 { return total_events_; }
int edm::StreamSchedule::totalEventsFailed ( ) const
inline

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 234 of file StreamSchedule.h.

References totalEvents(), and totalEventsPassed().

Referenced by getTriggerReport().

234 { return totalEvents() - totalEventsPassed(); }
int totalEvents() const
int totalEventsPassed() const
int edm::StreamSchedule::totalEventsPassed ( ) const
inline

Return the number of events which have been passed by one or more trigger paths.

Definition at line 230 of file StreamSchedule.h.

References total_passed_.

Referenced by getTriggerReport(), and totalEventsFailed().

230 { return total_passed_; }

Member Data Documentation

std::shared_ptr<ActivityRegistry> edm::StreamSchedule::actReg_
private
std::vector<BranchToCount> edm::StreamSchedule::earlyDeleteBranchToCount_
private

Definition at line 345 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

std::vector<EarlyDeleteHelper> edm::StreamSchedule::earlyDeleteHelpers_
private

Definition at line 355 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

std::vector<unsigned int> edm::StreamSchedule::earlyDeleteHelperToBranchIndicies_
private

Definition at line 352 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

std::vector<int> edm::StreamSchedule::empty_end_paths_
private

Definition at line 340 of file StreamSchedule.h.

Referenced by fillEndPath(), makePathStatusInserters(), and processOneEventAsync().

std::vector<int> edm::StreamSchedule::empty_trig_paths_
private

Definition at line 339 of file StreamSchedule.h.

Referenced by fillTrigPath(), makePathStatusInserters(), and processOneEventAsync().

TrigPaths edm::StreamSchedule::end_paths_
private
std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::endPathStatusInserterWorkers_
private

Definition at line 335 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

unsigned int edm::StreamSchedule::number_of_unscheduled_modules_
private

Definition at line 359 of file StreamSchedule.h.

Referenced by numberOfUnscheduledModules(), and StreamSchedule().

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::pathStatusInserterWorkers_
private

Definition at line 334 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

edm::propagate_const<TrigResPtr> edm::StreamSchedule::results_
private

Definition at line 331 of file StreamSchedule.h.

Referenced by finishedPaths(), processOneEventAsync(), resetAll(), and results().

edm::propagate_const<WorkerPtr> edm::StreamSchedule::results_inserter_
private

Definition at line 333 of file StreamSchedule.h.

Referenced by finishedPaths(), and StreamSchedule().

std::atomic<bool> edm::StreamSchedule::skippingEvent_
private

Definition at line 363 of file StreamSchedule.h.

Referenced by fillTrigPath(), and resetAll().

StreamContext edm::StreamSchedule::streamContext_
private
StreamID edm::StreamSchedule::streamID_
private
int edm::StreamSchedule::total_events_
private

Definition at line 357 of file StreamSchedule.h.

Referenced by clearCounters(), processOneEventAsync(), and totalEvents().

int edm::StreamSchedule::total_passed_
private

Definition at line 358 of file StreamSchedule.h.

Referenced by clearCounters(), finishedPaths(), and totalEventsPassed().

TrigPaths edm::StreamSchedule::trig_paths_
private
WorkerManager edm::StreamSchedule::workerManager_
private