CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Private Member Functions | Private Attributes
edm::StreamSchedule Class Reference

#include <StreamSchedule.h>

Classes

class  SendTerminationSignalIfException
 

Public Types

typedef std::vector< Worker * > AllWorkers
 
typedef std::vector< PathNonTrigPaths
 
typedef std::vector< WorkerInPathPathWorkers
 
typedef std::vector< PathTrigPaths
 
typedef std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
 
typedef std::shared_ptr< HLTGlobalStatusTrigResPtr
 
typedef std::vector< std::string > vstring
 
typedef std::shared_ptr< WorkerWorkerPtr
 
typedef std::vector< Worker * > Workers
 

Public Member Functions

AllWorkers const & allWorkers () const
 returns the collection of pointers to workers More...
 
void availablePaths (std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill the labels for all paths in the process More...
 
void beginStream ()
 
void clearCounters ()
 Clear all the counters in the trigger report. More...
 
StreamContext const & context () const
 
void enableEndPaths (bool active)
 
bool endPathsEnabled () const
 
void endStream ()
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
void getTriggerReport (TriggerReport &rep) const
 
void moduleDescriptionsInEndPath (std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
 
void moduleDescriptionsInPath (std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
 
void modulesInPath (std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel More...
 
unsigned int numberOfUnscheduledModules () const
 
void processOneEventAsync (WaitingTaskHolder iTask, EventPrincipal &ep, EventSetup const &es, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
 
template<typename T >
void processOneStream (typename T::MyPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
 
template<typename T >
void processOneStreamAsync (WaitingTaskHolder iTask, typename T::MyPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
 
void replaceModule (maker::ModuleHolder *iMod, std::string const &iLabel)
 clone the type of module with label iLabel but configure with iPSet. More...
 
StreamID streamID () const
 
 StreamSchedule (std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
 
 StreamSchedule (StreamSchedule const &)=delete
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 

Private Member Functions

ExceptionToActionTable const & actionTable () const
 returns the action table More...
 
void addToAllWorkers (Worker *w)
 
void fillEndPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
 
void fillTrigPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
 
void fillWorkers (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
 
void finishedPaths (std::exception_ptr, WaitingTaskHolder, EventPrincipal &ep, EventSetup const &es)
 
std::exception_ptr finishProcessOneEvent (std::exception_ptr)
 
void initializeEarlyDelete (ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
 
void makePathStatusInserters (std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
 
void reportSkipped (EventPrincipal const &ep) const
 
void resetAll ()
 
void resetEarlyDelete ()
 
TrigResConstPtr results () const
 
TrigResPtrresults ()
 
template<typename T >
void runEndPaths (typename T::MyPrincipal const &, EventSetup const &, typename T::Context const *)
 
template<typename T >
bool runTriggerPaths (typename T::MyPrincipal const &, EventSetup const &, typename T::Context const *)
 

Private Attributes

std::shared_ptr< ActivityRegistryactReg_
 
std::vector< BranchToCountearlyDeleteBranchToCount_
 
std::vector< EarlyDeleteHelperearlyDeleteHelpers_
 
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
 
std::vector< int > empty_end_paths_
 
std::vector< int > empty_trig_paths_
 
TrigPaths end_paths_
 
volatile bool endpathsAreActive_
 
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
 
unsigned int number_of_unscheduled_modules_
 
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
 
edm::propagate_const< TrigResPtrresults_
 
edm::propagate_const< WorkerPtrresults_inserter_
 
std::atomic< bool > skippingEvent_
 
StreamContext streamContext_
 
StreamID streamID_
 
int total_events_
 
int total_passed_
 
TrigPaths trig_paths_
 
WorkerManager workerManager_
 

Detailed Description

Definition at line 147 of file StreamSchedule.h.

Member Typedef Documentation

typedef std::vector<Worker*> edm::StreamSchedule::AllWorkers

Definition at line 155 of file StreamSchedule.h.

Definition at line 151 of file StreamSchedule.h.

Definition at line 159 of file StreamSchedule.h.

typedef std::vector<Path> edm::StreamSchedule::TrigPaths

Definition at line 150 of file StreamSchedule.h.

typedef std::shared_ptr<HLTGlobalStatus const> edm::StreamSchedule::TrigResConstPtr

Definition at line 153 of file StreamSchedule.h.

Definition at line 152 of file StreamSchedule.h.

typedef std::vector<std::string> edm::StreamSchedule::vstring

Definition at line 149 of file StreamSchedule.h.

typedef std::shared_ptr<Worker> edm::StreamSchedule::WorkerPtr

Definition at line 154 of file StreamSchedule.h.

typedef std::vector<Worker*> edm::StreamSchedule::Workers

Definition at line 157 of file StreamSchedule.h.

Constructor & Destructor Documentation

edm::StreamSchedule::StreamSchedule ( std::shared_ptr< TriggerResultInserter inserter,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
std::shared_ptr< ModuleRegistry modReg,
ParameterSet proc_pset,
service::TriggerNamesService const &  tns,
PreallocationConfiguration const &  prealloc,
ProductRegistry pregistry,
BranchIDListHelper branchIDListHelper,
ExceptionToActionTable const &  actions,
std::shared_ptr< ActivityRegistry areg,
std::shared_ptr< ProcessConfiguration processConfiguration,
bool  allowEarlyDelete,
StreamID  streamID,
ProcessContext const *  processContext 
)

Definition at line 135 of file StreamSchedule.cc.

References actReg_, addToAllWorkers(), edm::WorkerManager::addToUnscheduledWorkers(), allWorkers(), end_paths_, fillEndPath(), fillTrigPath(), edm::propagate_const< T >::get(), edm::service::TriggerNamesService::getEndPaths(), edm::ParameterSet::getParameter(), edm::ParameterSet::getPSetForUpdate(), edm::service::TriggerNamesService::getTrigPaths(), edm::ParameterSet::getUntrackedParameterSet(), initializeEarlyDelete(), diffTwoXMLs::label, makePathStatusInserters(), number_of_unscheduled_modules_, geometryDiff::opts, results(), results_inserter_, edm::WorkerManager::setOnDemandProducts(), trig_paths_, edm::StreamID::value(), and workerManager_.

149  :
150  workerManager_(modReg,areg, actions),
151  actReg_(areg),
152  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
154  trig_paths_(),
155  end_paths_(),
156  total_events_(),
157  total_passed_(),
160  streamContext_(streamID_, processContext),
161  endpathsAreActive_(true),
162  skippingEvent_(false){
163 
164  ParameterSet const& opts = proc_pset.getUntrackedParameterSet("options", ParameterSet());
165  bool hasPath = false;
166  std::vector<std::string> const& pathNames = tns.getTrigPaths();
167  std::vector<std::string> const& endPathNames = tns.getEndPaths();
168 
169  int trig_bitpos = 0;
170  trig_paths_.reserve(pathNames.size());
171  for (auto const& trig_name : pathNames) {
172  fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name, results(), endPathNames);
173  ++trig_bitpos;
174  hasPath = true;
175  }
176 
177  if (hasPath) {
178  // the results inserter stands alone
179  inserter->setTrigResultForStream(streamID.value(), results());
180 
181  results_inserter_ = makeInserter(actions, actReg_, inserter);
183  }
184 
185  // fill normal endpaths
186  int bitpos = 0;
187  end_paths_.reserve(endPathNames.size());
188  for (auto const& end_path_name : endPathNames) {
189  fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames);
190  ++bitpos;
191  }
192 
193  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
194 
195  //See if all modules were used
196  std::set<std::string> usedWorkerLabels;
197  for (auto const& worker : allWorkers()) {
198  usedWorkerLabels.insert(worker->description().moduleLabel());
199  }
200  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string> >("@all_modules"));
201  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
202  std::vector<std::string> unusedLabels;
203  set_difference(modulesInConfigSet.begin(), modulesInConfigSet.end(),
204  usedWorkerLabels.begin(), usedWorkerLabels.end(),
205  back_inserter(unusedLabels));
206  std::set<std::string> unscheduledLabels;
207  std::vector<std::string> shouldBeUsedLabels;
208  if (!unusedLabels.empty()) {
209  //Need to
210  // 1) create worker
211  // 2) if it is a WorkerT<EDProducer>, add it to our list
212  // 3) hand list to our delayed reader
213  for (auto const& label : unusedLabels) {
214  bool isTracked;
215  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
216  assert(isTracked);
217  assert(modulePSet != nullptr);
218  workerManager_.addToUnscheduledWorkers(*modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
219  }
220  if (!shouldBeUsedLabels.empty()) {
221  std::ostringstream unusedStream;
222  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
223  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
224  itLabelEnd = shouldBeUsedLabels.end();
225  itLabel != itLabelEnd;
226  ++itLabel) {
227  unusedStream << ",'" << *itLabel << "'";
228  }
229  LogInfo("path")
230  << "The following module labels are not assigned to any path:\n"
231  << unusedStream.str()
232  << "\n";
233  }
234  }
235  if (!unscheduledLabels.empty()) {
236  number_of_unscheduled_modules_=unscheduledLabels.size();
237  workerManager_.setOnDemandProducts(preg, unscheduledLabels);
238  }
239 
240 
241  initializeEarlyDelete(*modReg, opts,preg,allowEarlyDelete);
242 
243  } // StreamSchedule::StreamSchedule
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
roAction_t actions[nactions]
Definition: GenABIO.cc:187
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
void setOnDemandProducts(ProductRegistry &pregistry, std::set< std::string > const &unscheduledLabels) const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
volatile bool endpathsAreActive_
void addToAllWorkers(Worker *w)
edm::propagate_const< WorkerPtr > results_inserter_
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
StreamID streamID() const
StreamContext streamContext_
element_type const * get() const
unsigned int value() const
Definition: StreamID.h:46
edm::propagate_const< TrigResPtr > results_
std::atomic< bool > skippingEvent_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
TrigResConstPtr results() const
edm::StreamSchedule::StreamSchedule ( StreamSchedule const &  )
delete

Member Function Documentation

ExceptionToActionTable const& edm::StreamSchedule::actionTable ( ) const
inlineprivate

returns the action table

Definition at line 294 of file StreamSchedule.h.

References dataset::name, geometryDiff::opts, MillePedeFileConverter_cfg::out, AlCaHLTBitMon_QueryRunRegistry::string, and w.

Referenced by fillEndPath(), fillTrigPath(), and finishedPaths().

294  {
295  return workerManager_.actionTable();
296  }
WorkerManager workerManager_
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:79
void edm::StreamSchedule::addToAllWorkers ( Worker w)
private

Definition at line 883 of file StreamSchedule.cc.

References edm::WorkerManager::addToAllWorkers(), and workerManager_.

Referenced by fillEndPath(), fillTrigPath(), makePathStatusInserters(), and StreamSchedule().

883  {
885  }
const double w
Definition: UKUtility.cc:23
WorkerManager workerManager_
void addToAllWorkers(Worker *w)
AllWorkers const& edm::StreamSchedule::allWorkers ( ) const
inline

returns the collection of pointers to workers

Definition at line 261 of file StreamSchedule.h.

Referenced by clearCounters(), getAllModuleDescriptions(), getTriggerReport(), initializeEarlyDelete(), replaceModule(), and StreamSchedule().

261  {
262  return workerManager_.allWorkers();
263  }
WorkerManager workerManager_
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:75
void edm::StreamSchedule::availablePaths ( std::vector< std::string > &  oLabelsToFill) const

adds to oLabelsToFill the labels for all paths in the process

Definition at line 713 of file StreamSchedule.cc.

References edm::Path::name(), create_public_lumi_plots::transform, and trig_paths_.

713  {
714  oLabelsToFill.reserve(trig_paths_.size());
715  std::transform(trig_paths_.begin(),
716  trig_paths_.end(),
717  std::back_inserter(oLabelsToFill),
718  std::bind(&Path::name, std::placeholders::_1));
719  }
std::string const & name() const
Definition: Path.h:77
void edm::StreamSchedule::beginStream ( )

Definition at line 499 of file StreamSchedule.cc.

References edm::WorkerManager::beginStream(), streamContext_, streamID_, and workerManager_.

499  {
501  }
WorkerManager workerManager_
StreamContext streamContext_
void beginStream(StreamID iID, StreamContext &streamContext)
void edm::StreamSchedule::clearCounters ( )

Clear all the counters in the trigger report.

Definition at line 868 of file StreamSchedule.cc.

References allWorkers(), edm::Path::clearCounters(), edm::Worker::clearCounters(), end_paths_, edm::for_all(), total_events_, total_passed_, and trig_paths_.

868  {
869  using std::placeholders::_1;
871  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
872  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
873  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
874  }
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void clearCounters()
Definition: Worker.h:162
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void clearCounters()
Definition: Path.cc:171
StreamContext const& edm::StreamSchedule::context ( ) const
inline

Definition at line 269 of file StreamSchedule.h.

269 { return streamContext_;}
StreamContext streamContext_
void edm::StreamSchedule::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 797 of file StreamSchedule.cc.

References endpathsAreActive_.

797  {
798  endpathsAreActive_ = active;
799  }
volatile bool endpathsAreActive_
bool edm::StreamSchedule::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 802 of file StreamSchedule.cc.

References endpathsAreActive_.

802  {
803  return endpathsAreActive_;
804  }
volatile bool endpathsAreActive_
void edm::StreamSchedule::endStream ( )

Definition at line 503 of file StreamSchedule.cc.

References edm::WorkerManager::endStream(), streamContext_, streamID_, and workerManager_.

503  {
505  }
void endStream(StreamID iID, StreamContext &streamContext)
WorkerManager workerManager_
StreamContext streamContext_
void edm::StreamSchedule::fillEndPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
std::vector< std::string > const &  endPathNames 
)
private

Definition at line 479 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_end_paths_, end_paths_, fillWorkers(), edm::PathContext::kEndPath, and streamContext_.

Referenced by StreamSchedule().

484  {
485  PathWorkers tmpworkers;
486  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames);
487 
488  if (!tmpworkers.empty()) {
489  //EndPaths are not supposed to stop if SkipEvent type exception happens
490  end_paths_.emplace_back(bitpos, name, tmpworkers, TrigResPtr(), actionTable(), actReg_, &streamContext_, nullptr, PathContext::PathType::kEndPath);
491  } else {
492  empty_end_paths_.push_back(bitpos);
493  }
494  for (WorkerInPath const& workerInPath : tmpworkers) {
495  addToAllWorkers(workerInPath.getWorker());
496  }
497  }
std::shared_ptr< HLTGlobalStatus > TrigResPtr
void addToAllWorkers(Worker *w)
ExceptionToActionTable const & actionTable() const
returns the action table
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
StreamContext streamContext_
void edm::StreamSchedule::fillTrigPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
TrigResPtr  trptr,
std::vector< std::string > const &  endPathNames 
)
private

Definition at line 459 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_trig_paths_, fillWorkers(), edm::PathContext::kPath, skippingEvent_, streamContext_, and trig_paths_.

Referenced by StreamSchedule().

464  {
465  PathWorkers tmpworkers;
466  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames);
467 
468  // an empty path will cause an extra bit that is not used
469  if (!tmpworkers.empty()) {
470  trig_paths_.emplace_back(bitpos, name, tmpworkers, trptr, actionTable(), actReg_, &streamContext_, &skippingEvent_, PathContext::PathType::kPath);
471  } else {
472  empty_trig_paths_.push_back(bitpos);
473  }
474  for (WorkerInPath const& workerInPath : tmpworkers) {
475  addToAllWorkers(workerInPath.getWorker());
476  }
477  }
std::vector< int > empty_trig_paths_
void addToAllWorkers(Worker *w)
ExceptionToActionTable const & actionTable() const
returns the action table
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
StreamContext streamContext_
std::atomic< bool > skippingEvent_
void edm::StreamSchedule::fillWorkers ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
std::string const &  name,
bool  ignoreFilters,
PathWorkers out,
std::vector< std::string > const &  endPathNames 
)
private

Definition at line 402 of file StreamSchedule.cc.

References edm::errors::Configuration, edm::Worker::description(), Exception, edm::ParameterSet::getParameter(), edm::ParameterSet::getPSetForUpdate(), edm::ParameterSet::getUntrackedParameter(), edm::WorkerManager::getWorker(), edm::WorkerInPath::Ignore, edm::Worker::kFilter, edm::ModuleDescription::moduleName(), edm::Worker::moduleType(), dataset::name, edm::WorkerInPath::Normal, edm::search_all(), AlCaHLTBitMon_QueryRunRegistry::string, edm::WorkerInPath::Veto, and workerManager_.

Referenced by fillEndPath(), and fillTrigPath().

409  {
410  vstring modnames = proc_pset.getParameter<vstring>(pathName);
411  PathWorkers tmpworkers;
412 
413  unsigned int placeInPath = 0;
414  for (auto const& name : modnames) {
415 
417  if (name[0] == '!') filterAction = WorkerInPath::Veto;
418  else if (name[0] == '-') filterAction = WorkerInPath::Ignore;
419 
420  std::string moduleLabel = name;
421  if (filterAction != WorkerInPath::Normal) moduleLabel.erase(0, 1);
422 
423  bool isTracked;
424  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
425  if (modpset == 0) {
426  std::string pathType("endpath");
427  if (!search_all(endPathNames, pathName)) {
428  pathType = std::string("path");
429  }
431  "The unknown module label \"" << moduleLabel <<
432  "\" appears in " << pathType << " \"" << pathName <<
433  "\"\n please check spelling or remove that label from the path.";
434  }
435  assert(isTracked);
436 
437  Worker* worker = workerManager_.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
438  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType()==Worker::kFilter) {
439  // We have a filter on an end path, and the filter is not explicitly ignored.
440  // See if the filter is allowed.
441  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
442  if (!search_all(allowed_filters, worker->description().moduleName())) {
443  // Filter is not allowed. Ignore the result, and issue a warning.
444  filterAction = WorkerInPath::Ignore;
445  LogWarning("FilterOnEndPath")
446  << "The EDFilter '" << worker->description().moduleName() << "' with module label '" << moduleLabel << "' appears on EndPath '" << pathName << "'.\n"
447  << "The return value of the filter will be ignored.\n"
448  << "To suppress this warning, either remove the filter from the endpath,\n"
449  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
450  }
451  }
452  tmpworkers.emplace_back(worker, filterAction, placeInPath);
453  ++placeInPath;
454  }
455 
456  out.swap(tmpworkers);
457  }
vector< string > vstring
Definition: ExoticaDQM.cc:8
WorkerManager workerManager_
std::vector< WorkerInPath > PathWorkers
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:46
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
void edm::StreamSchedule::finishedPaths ( std::exception_ptr  iExcept,
WaitingTaskHolder  iWait,
EventPrincipal ep,
EventSetup const &  es 
)
private

Definition at line 598 of file StreamSchedule.cc.

References mps_alisetup::action, actionTable(), cms::Exception::addContext(), cms::Exception::category(), cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), MillePedeFileConverter_cfg::e, end_paths_, endpathsAreActive_, edm::exception_actions::FailPath, edm::ExceptionToActionTable::find(), finishProcessOneEvent(), edm::propagate_const< T >::get(), edm::EventPrincipal::id(), edm::exception_actions::IgnoreCompletely, edm::ServiceRegistry::instance(), edm::make_waiting_task(), cmsPerfStripChart::operate(), or, edm::ServiceRegistry::presentToken(), edm::printCmsExceptionWarning(), results_, results_inserter_, edm::exception_actions::SkipEvent, streamContext_, streamID_, and total_passed_.

Referenced by processOneEventAsync().

599  {
600 
601  if(iExcept) {
602  try {
603  std::rethrow_exception(iExcept);
604  }
605  catch(cms::Exception& e) {
607  assert (action != exception_actions::IgnoreCompletely);
608  assert (action != exception_actions::FailPath);
609  if (action == exception_actions::SkipEvent) {
610  edm::printCmsExceptionWarning("SkipEvent", e);
611  iExcept = std::exception_ptr();
612  } else {
613  iExcept = std::current_exception();
614  }
615  }
616  catch(...) {
617  iExcept = std::current_exception();
618  }
619  }
620 
621 
622  if((not iExcept) and results_->accept()) {
623  ++total_passed_;
624  }
625 
626  if((not iExcept) and (nullptr != results_inserter_.get())) {
627  try {
628  ParentContext parentContext(&streamContext_);
629  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
630 
631  results_inserter_->doWork<Traits>(ep, es, streamID_, parentContext, &streamContext_);
632  }
633  catch (cms::Exception & ex) {
634  if(ex.context().empty()) {
635  std::ostringstream ost;
636  ost << "Processing Event " << ep.id();
637  ex.addContext(ost.str());
638  }
639  iExcept = std::current_exception();
640  }
641  catch(...) {
642  iExcept = std::current_exception();
643  }
644  }
645  if(end_paths_.empty() or iExcept or (not endpathsAreActive_)) {
646  iExcept = finishProcessOneEvent(iExcept);
647  iWait.doneWaiting(iExcept);
648  } else {
649  auto serviceToken = ServiceRegistry::instance().presentToken();
650 
651  auto endPathsDone = make_waiting_task(tbb::task::allocate_root(),
652  [iWait,this,serviceToken](std::exception_ptr const* iPtr) mutable
653  {
654  ServiceRegistry::Operate operate(serviceToken);
655 
656  std::exception_ptr ptr;
657  if(iPtr) {
658  ptr = *iPtr;
659  }
660  iWait.doneWaiting(finishProcessOneEvent(ptr));
661  });
662  //The holder guarantees that if the paths finish before the loop ends
663  // that we do not start too soon. It also guarantees that the task will
664  // run under that condition.
665  WaitingTaskHolder taskHolder(endPathsDone);
666  for(auto it = end_paths_.rbegin(), itEnd = end_paths_.rend();
667  it != itEnd; ++it) {
668  it->processOneOccurrenceAsync(endPathsDone,ep, es, streamID_, &streamContext_);
669  }
670  }
671  }
volatile bool endpathsAreActive_
std::string const & category() const
Definition: Exception.cc:183
exception_actions::ActionCodes find(const std::string &category) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
ServiceToken presentToken() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
StreamContext streamContext_
std::list< std::string > const & context() const
Definition: Exception.cc:191
static ServiceRegistry & instance()
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
element_type const * get() const
edm::propagate_const< TrigResPtr > results_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
void addContext(std::string const &context)
Definition: Exception.cc:227
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
def operate(timelog, memlog, json_f, num)
std::exception_ptr edm::StreamSchedule::finishProcessOneEvent ( std::exception_ptr  iExcept)
private

Definition at line 675 of file StreamSchedule.cc.

References actReg_, edm::addContextAndPrintException(), cms::Exception::context(), edm::ExceptionFromThisContext, resetEarlyDelete(), streamContext_, and edm::convertException::wrap().

Referenced by finishedPaths().

675  {
676  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
677 
678  if(iExcept) {
679  //add context information to the exception and print message
680  try {
681  convertException::wrap([&]() {
682  std::rethrow_exception(iExcept);
683  });
684  } catch(cms::Exception& ex) {
685  bool const cleaningUpAfterException = false;
686  if (ex.context().empty()) {
687  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
688  } else {
689  addContextAndPrintException("", ex, cleaningUpAfterException);
690  }
691  iExcept = std::current_exception();
692  }
693 
694  actReg_->preStreamEarlyTerminationSignal_(streamContext_,TerminationOrigin::ExceptionFromThisContext);
695  }
696 
697  try {
698  Traits::postScheduleSignal(actReg_.get(), &streamContext_);
699  } catch(...) {
700  if(not iExcept) {
701  iExcept = std::current_exception();
702  }
703  }
704  if(not iExcept ) {
706  }
707 
708  return iExcept;
709  }
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::shared_ptr< ActivityRegistry > actReg_
StreamContext streamContext_
std::list< std::string > const & context() const
Definition: Exception.cc:191
auto wrap(F iFunc) -> decltype(iFunc())
std::vector< ModuleDescription const * > edm::StreamSchedule::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this StreamSchedule. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 525 of file StreamSchedule.cc.

References allWorkers(), AlCaHLTBitMon_ParallelJobs::p, mps_fire::result, and findQualityFiles::size.

525  {
526  std::vector<ModuleDescription const*> result;
527  result.reserve(allWorkers().size());
528 
529  for (auto const& worker : allWorkers()) {
530  ModuleDescription const* p = worker->descPtr();
531  result.push_back(p);
532  }
533  return result;
534  }
size
Write out results.
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void edm::StreamSchedule::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 857 of file StreamSchedule.cc.

References allWorkers(), end_paths_, edm::TriggerReport::endPathSummaries, edm::TriggerReport::eventSummary, edm::fillPathSummary(), edm::fillWorkerSummary(), edm::EventSummary::totalEvents, totalEvents(), edm::EventSummary::totalEventsFailed, totalEventsFailed(), edm::EventSummary::totalEventsPassed, totalEventsPassed(), trig_paths_, edm::TriggerReport::trigPathSummaries, and edm::TriggerReport::workerSummaries.

857  {
858  rep.eventSummary.totalEvents += totalEvents();
859  rep.eventSummary.totalEventsPassed += totalEventsPassed();
860  rep.eventSummary.totalEventsFailed += totalEventsFailed();
861 
862  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
863  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
864  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
865  }
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
int totalEventsFailed() const
int totalEvents() const
int totalEventsPassed() const
static void fillPathSummary(Path const &path, PathSummary &sum)
rep
Definition: cuy.py:1188
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
void edm::StreamSchedule::initializeEarlyDelete ( ModuleRegistry modReg,
edm::ParameterSet const &  opts,
edm::ProductRegistry const &  preg,
bool  allowEarlyDelete 
)
private

Definition at line 246 of file StreamSchedule.cc.

References allWorkers(), MicroEventContent_cff::branch, edm::BranchDescription::branchName(), edm::maker::ModuleHolder::createOutputModuleCommunicator(), delta, earlyDeleteBranchToCount_, earlyDeleteHelpers_, earlyDeleteHelperToBranchIndicies_, end_paths_, edm::ModuleRegistry::forAllModuleHolders(), runEdmFileComparison::found, edm::pset::Registry::getMapped(), diffTreeTool::index, edm::InEvent, edm::pset::Registry::instance(), checklumidiff::l, gen::n, AlCaHLTBitMon_ParallelJobs::p, muonDTDigis_cfi::pset, resetEarlyDelete(), AlCaHLTBitMon_QueryRunRegistry::string, trig_paths_, mitigatedMETSequence_cff::U, and w.

Referenced by StreamSchedule().

248  {
249  //for now, if have a subProcess, don't allow early delete
250  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
251  if(not allowEarlyDelete) return;
252 
253  //see if 'canDeleteEarly' was set and if so setup the list with those products actually
254  // registered for this job
255  std::multimap<std::string,Worker*> branchToReadingWorker;
256  initializeBranchToReadingWorker(opts,preg,branchToReadingWorker);
257 
258  //If no delete early items have been specified we don't have to do anything
259  if(branchToReadingWorker.size()==0) {
260  return;
261  }
262  const std::vector<std::string> kEmpty;
263  std::map<Worker*,unsigned int> reserveSizeForWorker;
264  unsigned int upperLimitOnReadingWorker =0;
265  unsigned int upperLimitOnIndicies = 0;
266  unsigned int nUniqueBranchesToDelete=branchToReadingWorker.size();
267 
268  //talk with output modules first
269  modReg.forAllModuleHolders([this, &branchToReadingWorker,&nUniqueBranchesToDelete](maker::ModuleHolder* iHolder){
270  auto comm = iHolder->createOutputModuleCommunicator();
271  if (comm) {
272  if(branchToReadingWorker.size()>0) {
273  //If an OutputModule needs a product, we can't delete it early
274  // so we should remove it from our list
275  SelectedProductsForBranchType const& kept = comm->keptProducts();
276  for(auto const& item: kept[InEvent]) {
277  BranchDescription const& desc = *item.first;
278  auto found = branchToReadingWorker.equal_range(desc.branchName());
279  if(found.first !=found.second) {
280  --nUniqueBranchesToDelete;
281  branchToReadingWorker.erase(found.first,found.second);
282  }
283  }
284  }
285  }
286  });
287 
288  if(branchToReadingWorker.size()==0) {
289  return;
290  }
291 
292  for (auto w :allWorkers()) {
293  //determine if this module could read a branch we want to delete early
294  auto pset = pset::Registry::instance()->getMapped(w->description().parameterSetID());
295  if(0!=pset) {
296  auto branches = pset->getUntrackedParameter<std::vector<std::string>>("mightGet",kEmpty);
297  if(not branches.empty()) {
298  ++upperLimitOnReadingWorker;
299  }
300  for(auto const& branch:branches){
301  auto found = branchToReadingWorker.equal_range(branch);
302  if(found.first != found.second) {
303  ++upperLimitOnIndicies;
304  ++reserveSizeForWorker[w];
305  if(nullptr == found.first->second) {
306  found.first->second = w;
307  } else {
308  branchToReadingWorker.insert(make_pair(found.first->first,w));
309  }
310  }
311  }
312  }
313  }
314  {
315  auto it = branchToReadingWorker.begin();
316  std::vector<std::string> unusedBranches;
317  while(it !=branchToReadingWorker.end()) {
318  if(it->second == nullptr) {
319  unusedBranches.push_back(it->first);
320  //erasing the object invalidates the iterator so must advance it first
321  auto temp = it;
322  ++it;
323  branchToReadingWorker.erase(temp);
324  } else {
325  ++it;
326  }
327  }
328  if(not unusedBranches.empty()) {
329  LogWarning l("UnusedProductsForCanDeleteEarly");
330  l<<"The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
331  " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
332  for(auto const& n:unusedBranches){
333  l<<"\n "<<n;
334  }
335  }
336  }
337  if(0!=branchToReadingWorker.size()) {
338  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
339  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies,0);
340  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
341  std::map<const Worker*,EarlyDeleteHelper*> alreadySeenWorkers;
342  std::string lastBranchName;
343  size_t nextOpenIndex = 0;
344  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
345  for(auto& branchAndWorker:branchToReadingWorker) {
346  if(lastBranchName != branchAndWorker.first) {
347  //have to put back the period we removed earlier in order to get the proper name
348  BranchID bid(branchAndWorker.first+".");
349  earlyDeleteBranchToCount_.emplace_back(bid,0U);
350  lastBranchName = branchAndWorker.first;
351  }
352  auto found = alreadySeenWorkers.find(branchAndWorker.second);
353  if(alreadySeenWorkers.end() == found) {
354  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
355  // all the branches that might be read by this worker. However, initially we will only tell the
356  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
357  // EarlyDeleteHelper will automatically advance its internal end pointer.
358  size_t index = nextOpenIndex;
359  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
361  earlyDeleteHelpers_.emplace_back(beginAddress+index,
362  beginAddress+index+1,
364  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
365  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second,&(earlyDeleteHelpers_.back())));
366  nextOpenIndex +=nIndices;
367  } else {
368  found->second->appendIndex(earlyDeleteBranchToCount_.size()-1);
369  }
370  }
371 
372  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
373  // space needed for each module
374  auto itLast = earlyDeleteHelpers_.begin();
375  for(auto it = earlyDeleteHelpers_.begin()+1;it != earlyDeleteHelpers_.end();++it) {
376  if(itLast->end() != it->begin()) {
377  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
378  unsigned int delta = it->begin()- itLast->end();
379  it->shiftIndexPointers(delta);
380 
382  (itLast->end()-beginAddress),
384  (it->begin()-beginAddress));
385  }
386  itLast = it;
387  }
388  earlyDeleteHelperToBranchIndicies_.erase(earlyDeleteHelperToBranchIndicies_.begin()+(itLast->end()-beginAddress),
390 
391  //now tell the paths about the deleters
392  for(auto& p : trig_paths_) {
393  p.setEarlyDeleteHelpers(alreadySeenWorkers);
394  }
395  for(auto& p : end_paths_) {
396  p.setEarlyDeleteHelpers(alreadySeenWorkers);
397  }
399  }
400  }
dbl * delta
Definition: mlp_gen.cc:36
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
const double w
Definition: UKUtility.cc:23
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
std::vector< BranchToCount > earlyDeleteBranchToCount_
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:18
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
static Registry * instance()
Definition: Registry.cc:12
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void edm::StreamSchedule::makePathStatusInserters ( std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
ExceptionToActionTable const &  actions 
)
private

Definition at line 903 of file StreamSchedule.cc.

References actions, actReg_, addToAllWorkers(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, edm::get_underlying(), pathStatusInserterWorkers_, and trig_paths_.

Referenced by StreamSchedule().

906  {
907 
908  int bitpos = 0;
909  unsigned int indexEmpty = 0;
910  unsigned int indexOfPath = 0;
911  for(auto & pathStatusInserter : pathStatusInserters) {
912  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
914  inserterPtr->moduleDescription(),
915  &actions));
916  pathStatusInserterWorkers_.emplace_back(workerPtr);
917  workerPtr->setActivityRegistry(actReg_);
918  addToAllWorkers(workerPtr.get());
919 
920  // A little complexity here because a C++ Path object is not
921  // instantiated and put into end_paths if there are no modules
922  // on the configured path.
923  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
924  ++indexEmpty;
925  } else {
926  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(),
927  workerPtr.get());
928  ++indexOfPath;
929  }
930  ++bitpos;
931  }
932 
933  bitpos = 0;
934  indexEmpty = 0;
935  indexOfPath = 0;
936  for(auto & endPathStatusInserter : endPathStatusInserters) {
937  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
939  inserterPtr->moduleDescription(),
940  &actions));
941  endPathStatusInserterWorkers_.emplace_back(workerPtr);
942  workerPtr->setActivityRegistry(actReg_);
943  addToAllWorkers(workerPtr.get());
944 
945  // A little complexity here because a C++ Path object is not
946  // instantiated and put into end_paths if there are no modules
947  // on the configured path.
948  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
949  ++indexEmpty;
950  } else {
951  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr,
952  workerPtr.get());
953  ++indexOfPath;
954  }
955  ++bitpos;
956  }
957  }
std::vector< int > empty_trig_paths_
roAction_t actions[nactions]
Definition: GenABIO.cc:187
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void addToAllWorkers(Worker *w)
std::shared_ptr< Worker > WorkerPtr
std::shared_ptr< ActivityRegistry > actReg_
std::vector< int > empty_end_paths_
T & get_underlying(propagate_const< T > &)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void edm::StreamSchedule::moduleDescriptionsInEndPath ( std::string const &  iEndPathLabel,
std::vector< ModuleDescription const * > &  descriptions,
unsigned int  hint 
) const

Definition at line 768 of file StreamSchedule.cc.

References end_paths_, runEdmFileComparison::found, mps_fire::i, and edm::Path::name().

770  {
771  descriptions.clear();
772  bool found = false;
773  TrigPaths::const_iterator itFound;
774 
775  if(hint < end_paths_.size()) {
776  itFound = end_paths_.begin() + hint;
777  if(itFound->name() == iEndPathLabel) found = true;
778  }
779  if(!found) {
780  // if the hint did not work, do it the slow way
781  itFound = std::find_if (end_paths_.begin(),
782  end_paths_.end(),
783  std::bind(std::equal_to<std::string>(),
784  iEndPathLabel,
785  std::bind(&Path::name, std::placeholders::_1)));
786  if (itFound != end_paths_.end()) found = true;
787  }
788  if (found) {
789  descriptions.reserve(itFound->size());
790  for (size_t i = 0; i < itFound->size(); ++i) {
791  descriptions.push_back(itFound->getWorker(i)->descPtr());
792  }
793  }
794  }
std::string const & name() const
Definition: Path.h:77
void edm::StreamSchedule::moduleDescriptionsInPath ( std::string const &  iPathLabel,
std::vector< ModuleDescription const * > &  descriptions,
unsigned int  hint 
) const

Definition at line 739 of file StreamSchedule.cc.

References runEdmFileComparison::found, mps_fire::i, edm::Path::name(), and trig_paths_.

741  {
742  descriptions.clear();
743  bool found = false;
744  TrigPaths::const_iterator itFound;
745 
746  if(hint < trig_paths_.size()) {
747  itFound = trig_paths_.begin() + hint;
748  if(itFound->name() == iPathLabel) found = true;
749  }
750  if(!found) {
751  // if the hint did not work, do it the slow way
752  itFound = std::find_if (trig_paths_.begin(),
753  trig_paths_.end(),
754  std::bind(std::equal_to<std::string>(),
755  iPathLabel,
756  std::bind(&Path::name, std::placeholders::_1)));
757  if (itFound != trig_paths_.end()) found = true;
758  }
759  if (found) {
760  descriptions.reserve(itFound->size());
761  for (size_t i = 0; i < itFound->size(); ++i) {
762  descriptions.push_back(itFound->getWorker(i)->descPtr());
763  }
764  }
765  }
std::string const & name() const
Definition: Path.h:77
void edm::StreamSchedule::modulesInPath ( std::string const &  iPathLabel,
std::vector< std::string > &  oLabelsToFill 
) const

adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel

Definition at line 722 of file StreamSchedule.cc.

References mps_fire::i, edm::Path::name(), and trig_paths_.

723  {
724  TrigPaths::const_iterator itFound =
725  std::find_if (trig_paths_.begin(),
726  trig_paths_.end(),
727  std::bind(std::equal_to<std::string>(),
728  iPathLabel,
729  std::bind(&Path::name, std::placeholders::_1)));
730  if (itFound!=trig_paths_.end()) {
731  oLabelsToFill.reserve(itFound->size());
732  for (size_t i = 0; i < itFound->size(); ++i) {
733  oLabelsToFill.push_back(itFound->getWorker(i)->description().moduleLabel());
734  }
735  }
736  }
std::string const & name() const
Definition: Path.h:77
unsigned int edm::StreamSchedule::numberOfUnscheduledModules ( ) const
inline

Definition at line 265 of file StreamSchedule.h.

265  {
267  }
unsigned int number_of_unscheduled_modules_
void edm::StreamSchedule::processOneEventAsync ( WaitingTaskHolder  iTask,
EventPrincipal ep,
EventSetup const &  es,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters 
)

Definition at line 536 of file StreamSchedule.cc.

References actReg_, edm::WaitingTaskHolder::doneWaiting(), empty_end_paths_, empty_trig_paths_, endPathStatusInserterWorkers_, finishedPaths(), edm::ServiceRegistry::instance(), edm::make_waiting_task(), eostools::move(), cmsPerfStripChart::operate(), edm::hlt::Pass, pathStatusInserterWorkers_, edm::ServiceRegistry::presentToken(), resetAll(), results_, edm::WorkerManager::setupOnDemandSystem(), streamContext_, streamID_, total_events_, trig_paths_, and workerManager_.

539  {
540  this->resetAll();
541 
542  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
543 
544  Traits::setStreamContext(streamContext_, ep);
545  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
546 
547  HLTPathStatus hltPathStatus(hlt::Pass, 0);
548  for (int empty_trig_path : empty_trig_paths_) {
549  results_->at(empty_trig_path) = hltPathStatus;
550  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
551  std::exception_ptr iException = pathStatusInserterWorkers_[empty_trig_path]->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
552  ep, es, streamID_, ParentContext(&streamContext_), &streamContext_
553  );
554  if (iException) {
555  iTask.doneWaiting(iException);
556  return;
557  }
558  }
559  for (int empty_end_path : empty_end_paths_) {
560  std::exception_ptr iException = endPathStatusInserterWorkers_[empty_end_path]->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
561  ep, es, streamID_, ParentContext(&streamContext_), &streamContext_
562  );
563  if (iException) {
564  iTask.doneWaiting(iException);
565  return;
566  }
567  }
568 
569  // This call takes care of the unscheduled processing.
571 
572  ++total_events_;
573  auto serviceToken = ServiceRegistry::instance().presentToken();
574  auto pathsDone = make_waiting_task(tbb::task::allocate_root(),
575  [iTask,&ep, &es, this,serviceToken](std::exception_ptr const* iPtr) mutable
576  {
577  ServiceRegistry::Operate operate(serviceToken);
578 
579  std::exception_ptr ptr;
580  if(iPtr) {
581  ptr = *iPtr;
582  }
583  finishedPaths(ptr, std::move(iTask), ep, es);
584  });
585 
586  //The holder guarantees that if the paths finish before the loop ends
587  // that we do not start too soon. It also guarantees that the task will
588  // run under that condition.
589  WaitingTaskHolder taskHolder(pathsDone);
590 
591  for(auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend();
592  it != itEnd; ++ it) {
593  it->processOneOccurrenceAsync(pathsDone,ep, es, streamID_, &streamContext_);
594  }
595  }
std::vector< int > empty_trig_paths_
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
ServiceToken presentToken() const
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
std::vector< int > empty_end_paths_
accept
Definition: HLTenums.h:19
StreamContext streamContext_
static ServiceRegistry & instance()
edm::propagate_const< TrigResPtr > results_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void finishedPaths(std::exception_ptr, WaitingTaskHolder, EventPrincipal &ep, EventSetup const &es)
void setupOnDemandSystem(Principal &principal, EventSetup const &es)
def move(src, dest)
Definition: eostools.py:510
def operate(timelog, memlog, json_f, num)
template<typename T >
void edm::StreamSchedule::processOneStream ( typename T::MyPrincipal &  principal,
EventSetup const &  eventSetup,
bool  cleaningUpAfterException = false 
)

Definition at line 395 of file StreamSchedule.h.

References edm::addContextAndPrintException(), cms::Exception::context(), and edm::convertException::wrap().

397  {
398  this->resetAll();
399 
400  T::setStreamContext(streamContext_, ep);
401  StreamScheduleSignalSentry<T> sentry(actReg_.get(), &streamContext_);
402 
403  SendTerminationSignalIfException terminationSentry(actReg_.get(), &streamContext_);
404 
405  // This call takes care of the unscheduled processing.
406  workerManager_.processOneOccurrence<T>(ep, es, streamID_, &streamContext_, &streamContext_, cleaningUpAfterException);
407 
408  try {
409  convertException::wrap([&]() {
410  runTriggerPaths<T>(ep, es, &streamContext_);
411 
412  if (endpathsAreActive_) runEndPaths<T>(ep, es, &streamContext_);
413  });
414  }
415  catch(cms::Exception& ex) {
416  if (ex.context().empty()) {
417  addContextAndPrintException("Calling function StreamSchedule::processOneStream", ex, cleaningUpAfterException);
418  } else {
419  addContextAndPrintException("", ex, cleaningUpAfterException);
420  }
421  throw;
422  }
423  terminationSentry.completedSuccessfully();
424 
425  //If we got here no other exception has happened so we can propogate any Service related exceptions
426  sentry.allowThrow();
427  }
void processOneOccurrence(typename T::MyPrincipal &principal, EventSetup const &eventSetup, StreamID streamID, typename T::Context const *topContext, U const *context, bool cleaningUpAfterException=false)
volatile bool endpathsAreActive_
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
StreamContext streamContext_
std::list< std::string > const & context() const
Definition: Exception.cc:191
auto wrap(F iFunc) -> decltype(iFunc())
long double T
template<typename T >
void edm::StreamSchedule::processOneStreamAsync ( WaitingTaskHolder  iTask,
typename T::MyPrincipal &  principal,
EventSetup const &  eventSetup,
bool  cleaningUpAfterException = false 
)

Definition at line 430 of file StreamSchedule.h.

References edm::addContextAndPrintException(), cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), edm::ExceptionFromThisContext, h, hcalTTPDigis_cfi::id, edm::ServiceRegistry::instance(), edm::make_functor_task(), edm::make_waiting_task(), AlCaHLTBitMon_ParallelJobs::p, edm::ServiceRegistry::presentToken(), and edm::convertException::wrap().

433  {
435 
436  T::setStreamContext(streamContext_, ep);
437 
438  auto id = ep.id();
439  auto doneTask = make_waiting_task(tbb::task::allocate_root(),
440  [this,iHolder, id,cleaningUpAfterException,token](std::exception_ptr const* iPtr) mutable
441  {
442  ServiceRegistry::Operate op(token);
443  std::exception_ptr excpt;
444  if(iPtr) {
445  excpt = *iPtr;
446  //add context information to the exception and print message
447  try {
448  convertException::wrap([&]() {
449  std::rethrow_exception(excpt);
450  });
451  } catch(cms::Exception& ex) {
452  //TODO: should add the transition type info
453  std::ostringstream ost;
454  if(ex.context().empty()) {
455  ost<<"Processing "<<T::transitionName()<<" "<<id;
456  }
457  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
458  excpt = std::current_exception();
459  }
460 
461  actReg_->preStreamEarlyTerminationSignal_(streamContext_,TerminationOrigin::ExceptionFromThisContext);
462  }
463 
464  try {
465  T::postScheduleSignal(actReg_.get(), &streamContext_);
466  } catch(...) {
467  if(not excpt) {
468  excpt = std::current_exception();
469  }
470  }
471  iHolder.doneWaiting(excpt);
472 
473  });
474 
475  auto task = make_functor_task(tbb::task::allocate_root(), [this,doneTask,&ep,&es,cleaningUpAfterException,token] () mutable {
476  ServiceRegistry::Operate op(token);
477  T::preScheduleSignal(actReg_.get(), &streamContext_);
478  WaitingTaskHolder h(doneTask);
479 
481  for(auto& p : end_paths_) {
482  p.runAllModulesAsync<T>(doneTask, ep, es, streamID_, &streamContext_);
483  }
484 
485  for(auto& p : trig_paths_) {
486  p.runAllModulesAsync<T>(doneTask, ep, es, streamID_, &streamContext_);
487  }
488 
491  });
492 
493  if(streamID_.value() == 0) {
494  //Enqueueing will start another thread if there is only
495  // one thread in the job. Having stream == 0 use spawn
496  // avoids starting up another thread when there is only one stream.
497  tbb::task::spawn( *task);
498  } else {
499  tbb::task::enqueue( *task);
500  }
501  }
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
ServiceToken presentToken() const
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
void processOneOccurrenceAsync(WaitingTask *task, typename T::MyPrincipal &principal, EventSetup const &eventSetup, StreamID streamID, typename T::Context const *topContext, U const *context)
StreamContext streamContext_
std::list< std::string > const & context() const
Definition: Exception.cc:191
static ServiceRegistry & instance()
unsigned int value() const
Definition: StreamID.h:46
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
auto wrap(F iFunc) -> decltype(iFunc())
long double T
void edm::StreamSchedule::replaceModule ( maker::ModuleHolder iMod,
std::string const &  iLabel 
)

clone the type of module with label iLabel but configure with iPSet.

Definition at line 507 of file StreamSchedule.cc.

References allWorkers(), edm::Worker::beginStream(), runEdmFileComparison::found, edm::maker::ModuleHolder::replaceModuleFor(), streamContext_, and streamID_.

508  {
509  Worker* found = nullptr;
510  for (auto const& worker : allWorkers()) {
511  if (worker->description().moduleLabel() == iLabel) {
512  found = worker;
513  break;
514  }
515  }
516  if (nullptr == found) {
517  return;
518  }
519 
520  iMod->replaceModuleFor(found);
521  found->beginStream(streamID_,streamContext_);
522  }
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
StreamContext streamContext_
void edm::StreamSchedule::reportSkipped ( EventPrincipal const &  ep) const
inlineprivate

Definition at line 389 of file StreamSchedule.h.

References edm::EventID::event(), edm::EventPrincipal::id(), and edm::EventID::run().

389  {
390  Service<JobReport> reportSvc;
391  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
392  }
void edm::StreamSchedule::resetAll ( )
private

Definition at line 877 of file StreamSchedule.cc.

References results_, and skippingEvent_.

Referenced by processOneEventAsync().

877  {
878  skippingEvent_ = false;
879  results_->reset();
880  }
edm::propagate_const< TrigResPtr > results_
std::atomic< bool > skippingEvent_
void edm::StreamSchedule::resetEarlyDelete ( )
private

Definition at line 888 of file StreamSchedule.cc.

References KineDebug3::count(), earlyDeleteBranchToCount_, earlyDeleteHelpers_, earlyDeleteHelperToBranchIndicies_, and diffTreeTool::index.

Referenced by finishProcessOneEvent(), and initializeEarlyDelete().

888  {
889  //must be sure we have cleared the count first
890  for(auto& count:earlyDeleteBranchToCount_) {
891  count.count = 0;
892  }
893  //now reset based on how many helpers use that branch
895  ++(earlyDeleteBranchToCount_[index].count);
896  }
897  for(auto& helper: earlyDeleteHelpers_) {
898  helper.reset();
899  }
900  }
Definition: helper.py:1
std::vector< BranchToCount > earlyDeleteBranchToCount_
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
TrigResConstPtr edm::StreamSchedule::results ( ) const
inlineprivate

Definition at line 340 of file StreamSchedule.h.

References edm::get_underlying_safe().

Referenced by StreamSchedule().

340 {return get_underlying_safe(results_);}
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_
TrigResPtr& edm::StreamSchedule::results ( )
inlineprivate

Definition at line 341 of file StreamSchedule.h.

References actions, and edm::get_underlying_safe().

341 {return get_underlying_safe(results_);}
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_
template<typename T >
void edm::StreamSchedule::runEndPaths ( typename T::MyPrincipal const &  ep,
EventSetup const &  es,
typename T::Context const *  context 
)
private

Definition at line 515 of file StreamSchedule.h.

References AlCaHLTBitMon_ParallelJobs::p.

515  {
516  // Note there is no state-checking safety controlling the
517  // activation/deactivation of endpaths.
518  for(auto& p : end_paths_) {
519  p.processOneOccurrence<T>(ep, es, streamID_, context);
520  }
521  }
StreamContext const & context() const
long double T
template<typename T >
bool edm::StreamSchedule::runTriggerPaths ( typename T::MyPrincipal const &  ep,
EventSetup const &  es,
typename T::Context const *  context 
)
private

Definition at line 506 of file StreamSchedule.h.

References AlCaHLTBitMon_ParallelJobs::p.

506  {
507  for(auto& p : trig_paths_) {
508  p.processOneOccurrence<T>(ep, es, streamID_, context);
509  }
510  return results_->accept();
511  }
edm::propagate_const< TrigResPtr > results_
StreamContext const & context() const
long double T
StreamID edm::StreamSchedule::streamID ( ) const
inline

Definition at line 198 of file StreamSchedule.h.

References AlCaHLTBitMon_QueryRunRegistry::string.

198 { return streamID_; }
int edm::StreamSchedule::totalEvents ( ) const
inline

Return the number of events this StreamSchedule has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 226 of file StreamSchedule.h.

Referenced by getTriggerReport().

226  {
227  return total_events_;
228  }
int edm::StreamSchedule::totalEventsFailed ( ) const
inline

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 238 of file StreamSchedule.h.

References cuy::rep, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by getTriggerReport().

238  {
239  return totalEvents() - totalEventsPassed();
240  }
int totalEvents() const
int totalEventsPassed() const
int edm::StreamSchedule::totalEventsPassed ( ) const
inline

Return the number of events which have been passed by one or more trigger paths.

Definition at line 232 of file StreamSchedule.h.

Referenced by getTriggerReport().

232  {
233  return total_passed_;
234  }

Member Data Documentation

std::shared_ptr<ActivityRegistry> edm::StreamSchedule::actReg_
private
std::vector<BranchToCount> edm::StreamSchedule::earlyDeleteBranchToCount_
private

Definition at line 365 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

std::vector<EarlyDeleteHelper> edm::StreamSchedule::earlyDeleteHelpers_
private

Definition at line 375 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

std::vector<unsigned int> edm::StreamSchedule::earlyDeleteHelperToBranchIndicies_
private

Definition at line 372 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

std::vector<int> edm::StreamSchedule::empty_end_paths_
private

Definition at line 360 of file StreamSchedule.h.

Referenced by fillEndPath(), makePathStatusInserters(), and processOneEventAsync().

std::vector<int> edm::StreamSchedule::empty_trig_paths_
private

Definition at line 359 of file StreamSchedule.h.

Referenced by fillTrigPath(), makePathStatusInserters(), and processOneEventAsync().

TrigPaths edm::StreamSchedule::end_paths_
private
volatile bool edm::StreamSchedule::endpathsAreActive_
private

Definition at line 383 of file StreamSchedule.h.

Referenced by enableEndPaths(), endPathsEnabled(), and finishedPaths().

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::endPathStatusInserterWorkers_
private

Definition at line 355 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

unsigned int edm::StreamSchedule::number_of_unscheduled_modules_
private

Definition at line 379 of file StreamSchedule.h.

Referenced by StreamSchedule().

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::pathStatusInserterWorkers_
private

Definition at line 354 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

edm::propagate_const<TrigResPtr> edm::StreamSchedule::results_
private

Definition at line 351 of file StreamSchedule.h.

Referenced by finishedPaths(), processOneEventAsync(), and resetAll().

edm::propagate_const<WorkerPtr> edm::StreamSchedule::results_inserter_
private

Definition at line 353 of file StreamSchedule.h.

Referenced by finishedPaths(), and StreamSchedule().

std::atomic<bool> edm::StreamSchedule::skippingEvent_
private

Definition at line 384 of file StreamSchedule.h.

Referenced by fillTrigPath(), and resetAll().

StreamContext edm::StreamSchedule::streamContext_
private
StreamID edm::StreamSchedule::streamID_
private
int edm::StreamSchedule::total_events_
private

Definition at line 377 of file StreamSchedule.h.

Referenced by clearCounters(), and processOneEventAsync().

int edm::StreamSchedule::total_passed_
private

Definition at line 378 of file StreamSchedule.h.

Referenced by clearCounters(), and finishedPaths().

TrigPaths edm::StreamSchedule::trig_paths_
private
WorkerManager edm::StreamSchedule::workerManager_
private