CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Private Member Functions | Private Attributes
edm::StreamSchedule Class Reference

#include <StreamSchedule.h>

Classes

class  SendTerminationSignalIfException
 

Public Types

typedef std::vector< Worker * > AllWorkers
 
typedef std::vector< PathNonTrigPaths
 
typedef std::vector< WorkerInPathPathWorkers
 
typedef std::vector< PathTrigPaths
 
typedef std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
 
typedef std::shared_ptr< HLTGlobalStatusTrigResPtr
 
typedef std::vector< std::string > vstring
 
typedef std::shared_ptr< WorkerWorkerPtr
 
typedef std::vector< Worker * > Workers
 

Public Member Functions

AllWorkers const & allWorkers () const
 returns the collection of pointers to workers More...
 
void availablePaths (std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill the labels for all paths in the process More...
 
void beginStream ()
 
void clearCounters ()
 Clear all the counters in the trigger report. More...
 
StreamContext const & context () const
 
void enableEndPaths (bool active)
 
bool endPathsEnabled () const
 
void endStream ()
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
void getTriggerReport (TriggerReport &rep) const
 
void moduleDescriptionsInEndPath (std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
 
void moduleDescriptionsInPath (std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
 
void modulesInPath (std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel More...
 
unsigned int numberOfUnscheduledModules () const
 
void processOneEventAsync (WaitingTaskHolder iTask, EventPrincipal &ep, EventSetup const &es, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
 
template<typename T >
void processOneStreamAsync (WaitingTaskHolder iTask, typename T::MyPrincipal &principal, EventSetup const &eventSetup, ServiceToken const &token, bool cleaningUpAfterException=false)
 
void replaceModule (maker::ModuleHolder *iMod, std::string const &iLabel)
 clone the type of module with label iLabel but configure with iPSet. More...
 
StreamID streamID () const
 
 StreamSchedule (std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
 
 StreamSchedule (StreamSchedule const &)=delete
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 

Private Member Functions

ExceptionToActionTable const & actionTable () const
 returns the action table More...
 
void addToAllWorkers (Worker *w)
 
void fillEndPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
 
void fillTrigPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
 
void fillWorkers (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
 
void finishedPaths (std::atomic< std::exception_ptr * > &, WaitingTaskHolder, EventPrincipal &ep, EventSetup const &es)
 
std::exception_ptr finishProcessOneEvent (std::exception_ptr)
 
void initializeEarlyDelete (ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
 
void makePathStatusInserters (std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
 
void reportSkipped (EventPrincipal const &ep) const
 
void resetAll ()
 
void resetEarlyDelete ()
 
TrigResConstPtr results () const
 
TrigResPtrresults ()
 

Private Attributes

std::shared_ptr< ActivityRegistryactReg_
 
std::vector< BranchToCountearlyDeleteBranchToCount_
 
std::vector< EarlyDeleteHelperearlyDeleteHelpers_
 
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
 
std::vector< int > empty_end_paths_
 
std::vector< int > empty_trig_paths_
 
TrigPaths end_paths_
 
volatile bool endpathsAreActive_
 
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
 
unsigned int number_of_unscheduled_modules_
 
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
 
edm::propagate_const< TrigResPtrresults_
 
edm::propagate_const< WorkerPtrresults_inserter_
 
std::atomic< bool > skippingEvent_
 
StreamContext streamContext_
 
StreamID streamID_
 
int total_events_
 
int total_passed_
 
TrigPaths trig_paths_
 
WorkerManager workerManager_
 

Detailed Description

Definition at line 147 of file StreamSchedule.h.

Member Typedef Documentation

typedef std::vector<Worker*> edm::StreamSchedule::AllWorkers

Definition at line 155 of file StreamSchedule.h.

Definition at line 151 of file StreamSchedule.h.

Definition at line 159 of file StreamSchedule.h.

typedef std::vector<Path> edm::StreamSchedule::TrigPaths

Definition at line 150 of file StreamSchedule.h.

typedef std::shared_ptr<HLTGlobalStatus const> edm::StreamSchedule::TrigResConstPtr

Definition at line 153 of file StreamSchedule.h.

Definition at line 152 of file StreamSchedule.h.

typedef std::vector<std::string> edm::StreamSchedule::vstring

Definition at line 149 of file StreamSchedule.h.

typedef std::shared_ptr<Worker> edm::StreamSchedule::WorkerPtr

Definition at line 154 of file StreamSchedule.h.

typedef std::vector<Worker*> edm::StreamSchedule::Workers

Definition at line 157 of file StreamSchedule.h.

Constructor & Destructor Documentation

edm::StreamSchedule::StreamSchedule ( std::shared_ptr< TriggerResultInserter inserter,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
std::shared_ptr< ModuleRegistry modReg,
ParameterSet proc_pset,
service::TriggerNamesService const &  tns,
PreallocationConfiguration const &  prealloc,
ProductRegistry pregistry,
BranchIDListHelper branchIDListHelper,
ExceptionToActionTable const &  actions,
std::shared_ptr< ActivityRegistry areg,
std::shared_ptr< ProcessConfiguration processConfiguration,
bool  allowEarlyDelete,
StreamID  streamID,
ProcessContext const *  processContext 
)

Definition at line 137 of file StreamSchedule.cc.

References actReg_, addToAllWorkers(), edm::WorkerManager::addToUnscheduledWorkers(), allWorkers(), end_paths_, fillEndPath(), fillTrigPath(), edm::propagate_const< T >::get(), edm::service::TriggerNamesService::getEndPaths(), edm::ParameterSet::getParameter(), edm::ParameterSet::getPSetForUpdate(), edm::service::TriggerNamesService::getTrigPaths(), edm::ParameterSet::getUntrackedParameterSet(), initializeEarlyDelete(), diffTwoXMLs::label, makePathStatusInserters(), number_of_unscheduled_modules_, results(), results_inserter_, edm::WorkerManager::setOnDemandProducts(), trig_paths_, edm::StreamID::value(), and workerManager_.

151  :
152  workerManager_(modReg,areg, actions),
153  actReg_(areg),
154  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
156  trig_paths_(),
157  end_paths_(),
158  total_events_(),
159  total_passed_(),
162  streamContext_(streamID_, processContext),
163  endpathsAreActive_(true),
164  skippingEvent_(false){
165 
166  ParameterSet const& opts = proc_pset.getUntrackedParameterSet("options", ParameterSet());
167  bool hasPath = false;
168  std::vector<std::string> const& pathNames = tns.getTrigPaths();
169  std::vector<std::string> const& endPathNames = tns.getEndPaths();
170 
171  int trig_bitpos = 0;
172  trig_paths_.reserve(pathNames.size());
173  for (auto const& trig_name : pathNames) {
174  fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name, results(), endPathNames);
175  ++trig_bitpos;
176  hasPath = true;
177  }
178 
179  if (hasPath) {
180  // the results inserter stands alone
181  inserter->setTrigResultForStream(streamID.value(), results());
182 
183  results_inserter_ = makeInserter(actions, actReg_, inserter);
185  }
186 
187  // fill normal endpaths
188  int bitpos = 0;
189  end_paths_.reserve(endPathNames.size());
190  for (auto const& end_path_name : endPathNames) {
191  fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames);
192  ++bitpos;
193  }
194 
195  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
196 
197  //See if all modules were used
198  std::set<std::string> usedWorkerLabels;
199  for (auto const& worker : allWorkers()) {
200  usedWorkerLabels.insert(worker->description().moduleLabel());
201  }
202  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string> >("@all_modules"));
203  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
204  std::vector<std::string> unusedLabels;
205  set_difference(modulesInConfigSet.begin(), modulesInConfigSet.end(),
206  usedWorkerLabels.begin(), usedWorkerLabels.end(),
207  back_inserter(unusedLabels));
208  std::set<std::string> unscheduledLabels;
209  std::vector<std::string> shouldBeUsedLabels;
210  if (!unusedLabels.empty()) {
211  //Need to
212  // 1) create worker
213  // 2) if it is a WorkerT<EDProducer>, add it to our list
214  // 3) hand list to our delayed reader
215  for (auto const& label : unusedLabels) {
216  bool isTracked;
217  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
218  assert(isTracked);
219  assert(modulePSet != nullptr);
220  workerManager_.addToUnscheduledWorkers(*modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
221  }
222  if (!shouldBeUsedLabels.empty()) {
223  std::ostringstream unusedStream;
224  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
225  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
226  itLabelEnd = shouldBeUsedLabels.end();
227  itLabel != itLabelEnd;
228  ++itLabel) {
229  unusedStream << ",'" << *itLabel << "'";
230  }
231  LogInfo("path")
232  << "The following module labels are not assigned to any path:\n"
233  << unusedStream.str()
234  << "\n";
235  }
236  }
237  if (!unscheduledLabels.empty()) {
238  number_of_unscheduled_modules_=unscheduledLabels.size();
239  workerManager_.setOnDemandProducts(preg, unscheduledLabels);
240  }
241 
242 
243  initializeEarlyDelete(*modReg, opts,preg,allowEarlyDelete);
244 
245  } // StreamSchedule::StreamSchedule
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
roAction_t actions[nactions]
Definition: GenABIO.cc:187
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
void setOnDemandProducts(ProductRegistry &pregistry, std::set< std::string > const &unscheduledLabels) const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
volatile bool endpathsAreActive_
void addToAllWorkers(Worker *w)
edm::propagate_const< WorkerPtr > results_inserter_
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
StreamID streamID() const
StreamContext streamContext_
element_type const * get() const
unsigned int value() const
Definition: StreamID.h:46
edm::propagate_const< TrigResPtr > results_
std::atomic< bool > skippingEvent_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
TrigResConstPtr results() const
edm::StreamSchedule::StreamSchedule ( StreamSchedule const &  )
delete

Member Function Documentation

ExceptionToActionTable const& edm::StreamSchedule::actionTable ( ) const
inlineprivate

returns the action table

Definition at line 291 of file StreamSchedule.h.

References dataset::name, geometryDiff::opts, MillePedeFileConverter_cfg::out, AlCaHLTBitMon_QueryRunRegistry::string, and w.

Referenced by fillEndPath(), fillTrigPath(), and finishedPaths().

291  {
292  return workerManager_.actionTable();
293  }
WorkerManager workerManager_
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:90
void edm::StreamSchedule::addToAllWorkers ( Worker w)
private

Definition at line 910 of file StreamSchedule.cc.

References edm::WorkerManager::addToAllWorkers(), and workerManager_.

Referenced by fillEndPath(), fillTrigPath(), makePathStatusInserters(), and StreamSchedule().

910  {
912  }
const double w
Definition: UKUtility.cc:23
WorkerManager workerManager_
void addToAllWorkers(Worker *w)
AllWorkers const& edm::StreamSchedule::allWorkers ( ) const
inline

returns the collection of pointers to workers

Definition at line 258 of file StreamSchedule.h.

Referenced by clearCounters(), getAllModuleDescriptions(), getTriggerReport(), initializeEarlyDelete(), replaceModule(), and StreamSchedule().

258  {
259  return workerManager_.allWorkers();
260  }
WorkerManager workerManager_
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:86
void edm::StreamSchedule::availablePaths ( std::vector< std::string > &  oLabelsToFill) const

adds to oLabelsToFill the labels for all paths in the process

Definition at line 740 of file StreamSchedule.cc.

References edm::Path::name(), create_public_lumi_plots::transform, and trig_paths_.

740  {
741  oLabelsToFill.reserve(trig_paths_.size());
742  std::transform(trig_paths_.begin(),
743  trig_paths_.end(),
744  std::back_inserter(oLabelsToFill),
745  std::bind(&Path::name, std::placeholders::_1));
746  }
std::string const & name() const
Definition: Path.h:75
void edm::StreamSchedule::beginStream ( )

Definition at line 501 of file StreamSchedule.cc.

References edm::WorkerManager::beginStream(), streamContext_, streamID_, and workerManager_.

501  {
503  }
WorkerManager workerManager_
StreamContext streamContext_
void beginStream(StreamID iID, StreamContext &streamContext)
void edm::StreamSchedule::clearCounters ( )

Clear all the counters in the trigger report.

Definition at line 895 of file StreamSchedule.cc.

References allWorkers(), edm::Path::clearCounters(), edm::Worker::clearCounters(), end_paths_, edm::for_all(), total_events_, total_passed_, and trig_paths_.

895  {
896  using std::placeholders::_1;
898  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
899  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
900  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
901  }
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void clearCounters()
Definition: Worker.h:212
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void clearCounters()
Definition: Path.cc:171
StreamContext const& edm::StreamSchedule::context ( ) const
inline

Definition at line 266 of file StreamSchedule.h.

266 { return streamContext_;}
StreamContext streamContext_
void edm::StreamSchedule::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 824 of file StreamSchedule.cc.

References endpathsAreActive_.

824  {
825  endpathsAreActive_ = active;
826  }
volatile bool endpathsAreActive_
bool edm::StreamSchedule::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 829 of file StreamSchedule.cc.

References endpathsAreActive_.

829  {
830  return endpathsAreActive_;
831  }
volatile bool endpathsAreActive_
void edm::StreamSchedule::endStream ( )

Definition at line 505 of file StreamSchedule.cc.

References edm::WorkerManager::endStream(), streamContext_, streamID_, and workerManager_.

505  {
507  }
void endStream(StreamID iID, StreamContext &streamContext)
WorkerManager workerManager_
StreamContext streamContext_
void edm::StreamSchedule::fillEndPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
std::vector< std::string > const &  endPathNames 
)
private

Definition at line 481 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_end_paths_, end_paths_, fillWorkers(), edm::PathContext::kEndPath, and streamContext_.

Referenced by StreamSchedule().

486  {
487  PathWorkers tmpworkers;
488  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames);
489 
490  if (!tmpworkers.empty()) {
491  //EndPaths are not supposed to stop if SkipEvent type exception happens
492  end_paths_.emplace_back(bitpos, name, tmpworkers, TrigResPtr(), actionTable(), actReg_, &streamContext_, nullptr, PathContext::PathType::kEndPath);
493  } else {
494  empty_end_paths_.push_back(bitpos);
495  }
496  for (WorkerInPath const& workerInPath : tmpworkers) {
497  addToAllWorkers(workerInPath.getWorker());
498  }
499  }
std::shared_ptr< HLTGlobalStatus > TrigResPtr
void addToAllWorkers(Worker *w)
ExceptionToActionTable const & actionTable() const
returns the action table
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
StreamContext streamContext_
void edm::StreamSchedule::fillTrigPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
TrigResPtr  trptr,
std::vector< std::string > const &  endPathNames 
)
private

Definition at line 461 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_trig_paths_, fillWorkers(), edm::PathContext::kPath, skippingEvent_, streamContext_, and trig_paths_.

Referenced by StreamSchedule().

466  {
467  PathWorkers tmpworkers;
468  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames);
469 
470  // an empty path will cause an extra bit that is not used
471  if (!tmpworkers.empty()) {
472  trig_paths_.emplace_back(bitpos, name, tmpworkers, trptr, actionTable(), actReg_, &streamContext_, &skippingEvent_, PathContext::PathType::kPath);
473  } else {
474  empty_trig_paths_.push_back(bitpos);
475  }
476  for (WorkerInPath const& workerInPath : tmpworkers) {
477  addToAllWorkers(workerInPath.getWorker());
478  }
479  }
std::vector< int > empty_trig_paths_
void addToAllWorkers(Worker *w)
ExceptionToActionTable const & actionTable() const
returns the action table
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
StreamContext streamContext_
std::atomic< bool > skippingEvent_
void edm::StreamSchedule::fillWorkers ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
std::string const &  name,
bool  ignoreFilters,
PathWorkers out,
std::vector< std::string > const &  endPathNames 
)
private

Definition at line 404 of file StreamSchedule.cc.

References edm::errors::Configuration, edm::Worker::description(), Exception, edm::ParameterSet::getParameter(), edm::ParameterSet::getPSetForUpdate(), edm::ParameterSet::getUntrackedParameter(), edm::WorkerManager::getWorker(), edm::WorkerInPath::Ignore, edm::Worker::kFilter, edm::ModuleDescription::moduleName(), edm::Worker::moduleType(), dataset::name, edm::WorkerInPath::Normal, edm::search_all(), AlCaHLTBitMon_QueryRunRegistry::string, edm::WorkerInPath::Veto, and workerManager_.

Referenced by fillEndPath(), and fillTrigPath().

411  {
412  vstring modnames = proc_pset.getParameter<vstring>(pathName);
413  PathWorkers tmpworkers;
414 
415  unsigned int placeInPath = 0;
416  for (auto const& name : modnames) {
417 
419  if (name[0] == '!') filterAction = WorkerInPath::Veto;
420  else if (name[0] == '-') filterAction = WorkerInPath::Ignore;
421 
422  std::string moduleLabel = name;
423  if (filterAction != WorkerInPath::Normal) moduleLabel.erase(0, 1);
424 
425  bool isTracked;
426  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
427  if (modpset == nullptr) {
428  std::string pathType("endpath");
429  if (!search_all(endPathNames, pathName)) {
430  pathType = std::string("path");
431  }
433  "The unknown module label \"" << moduleLabel <<
434  "\" appears in " << pathType << " \"" << pathName <<
435  "\"\n please check spelling or remove that label from the path.";
436  }
437  assert(isTracked);
438 
439  Worker* worker = workerManager_.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
440  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType()==Worker::kFilter) {
441  // We have a filter on an end path, and the filter is not explicitly ignored.
442  // See if the filter is allowed.
443  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
444  if (!search_all(allowed_filters, worker->description().moduleName())) {
445  // Filter is not allowed. Ignore the result, and issue a warning.
446  filterAction = WorkerInPath::Ignore;
447  LogWarning("FilterOnEndPath")
448  << "The EDFilter '" << worker->description().moduleName() << "' with module label '" << moduleLabel << "' appears on EndPath '" << pathName << "'.\n"
449  << "The return value of the filter will be ignored.\n"
450  << "To suppress this warning, either remove the filter from the endpath,\n"
451  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
452  }
453  }
454  tmpworkers.emplace_back(worker, filterAction, placeInPath);
455  ++placeInPath;
456  }
457 
458  out.swap(tmpworkers);
459  }
vector< string > vstring
Definition: ExoticaDQM.cc:8
WorkerManager workerManager_
std::vector< WorkerInPath > PathWorkers
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:46
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
void edm::StreamSchedule::finishedPaths ( std::atomic< std::exception_ptr * > &  iExcept,
WaitingTaskHolder  iWait,
EventPrincipal ep,
EventSetup const &  es 
)
private

Definition at line 641 of file StreamSchedule.cc.

References mps_fire::action, actionTable(), cms::Exception::addContext(), cms::Exception::category(), cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), MillePedeFileConverter_cfg::e, edm::exception_actions::FailPath, edm::ExceptionToActionTable::find(), edm::propagate_const< T >::get(), edm::EventPrincipal::id(), edm::exception_actions::IgnoreCompletely, edm::printCmsExceptionWarning(), results_, results_inserter_, edm::exception_actions::SkipEvent, streamContext_, streamID_, and total_passed_.

Referenced by processOneEventAsync().

642  {
643 
644  if(iExcept) {
645  try {
646  std::rethrow_exception(*(iExcept.load()));
647  }
648  catch(cms::Exception& e) {
650  assert (action != exception_actions::IgnoreCompletely);
651  assert (action != exception_actions::FailPath);
652  if (action == exception_actions::SkipEvent) {
653  edm::printCmsExceptionWarning("SkipEvent", e);
654  *(iExcept.load()) = std::exception_ptr();
655  } else {
656  *(iExcept.load()) = std::current_exception();
657  }
658  }
659  catch(...) {
660  *(iExcept.load()) = std::current_exception();
661  }
662  }
663 
664 
665  if((not iExcept) and results_->accept()) {
666  ++total_passed_;
667  }
668 
669  if(nullptr != results_inserter_.get()) {
670  try {
671  //Even if there was an exception, we need to allow results inserter
672  // to run since some module may be waiting on its results.
673  ParentContext parentContext(&streamContext_);
674  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
675 
676  results_inserter_->doWork<Traits>(ep, es, streamID_, parentContext, &streamContext_);
677  }
678  catch (cms::Exception & ex) {
679  if (not iExcept) {
680  if(ex.context().empty()) {
681  std::ostringstream ost;
682  ost << "Processing Event " << ep.id();
683  ex.addContext(ost.str());
684  }
685  iExcept.store( new std::exception_ptr(std::current_exception()));
686  }
687  }
688  catch(...) {
689  if (not iExcept) {
690  iExcept.store(new std::exception_ptr(std::current_exception()));
691  }
692  }
693  }
694  std::exception_ptr ptr;
695  if(iExcept) {
696  ptr = *iExcept.load();
697  }
698  iWait.doneWaiting(ptr);
699  }
std::string const & category() const
Definition: Exception.cc:183
exception_actions::ActionCodes find(const std::string &category) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
StreamContext streamContext_
std::list< std::string > const & context() const
Definition: Exception.cc:191
element_type const * get() const
edm::propagate_const< TrigResPtr > results_
void addContext(std::string const &context)
Definition: Exception.cc:227
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
std::exception_ptr edm::StreamSchedule::finishProcessOneEvent ( std::exception_ptr  iExcept)
private

Definition at line 703 of file StreamSchedule.cc.

References actReg_, edm::addContextAndPrintException(), cms::Exception::context(), edm::ExceptionFromThisContext, resetEarlyDelete(), streamContext_, and edm::convertException::wrap().

Referenced by processOneEventAsync().

703  {
704  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
705 
706  if(iExcept) {
707  //add context information to the exception and print message
708  try {
709  convertException::wrap([&]() {
710  std::rethrow_exception(iExcept);
711  });
712  } catch(cms::Exception& ex) {
713  bool const cleaningUpAfterException = false;
714  if (ex.context().empty()) {
715  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
716  } else {
717  addContextAndPrintException("", ex, cleaningUpAfterException);
718  }
719  iExcept = std::current_exception();
720  }
721 
722  actReg_->preStreamEarlyTerminationSignal_(streamContext_,TerminationOrigin::ExceptionFromThisContext);
723  }
724 
725  try {
726  Traits::postScheduleSignal(actReg_.get(), &streamContext_);
727  } catch(...) {
728  if(not iExcept) {
729  iExcept = std::current_exception();
730  }
731  }
732  if(not iExcept ) {
734  }
735 
736  return iExcept;
737  }
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::shared_ptr< ActivityRegistry > actReg_
StreamContext streamContext_
std::list< std::string > const & context() const
Definition: Exception.cc:191
auto wrap(F iFunc) -> decltype(iFunc())
std::vector< ModuleDescription const * > edm::StreamSchedule::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this StreamSchedule. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 527 of file StreamSchedule.cc.

References allWorkers(), AlCaHLTBitMon_ParallelJobs::p, mps_fire::result, and findQualityFiles::size.

527  {
528  std::vector<ModuleDescription const*> result;
529  result.reserve(allWorkers().size());
530 
531  for (auto const& worker : allWorkers()) {
532  ModuleDescription const* p = worker->descPtr();
533  result.push_back(p);
534  }
535  return result;
536  }
size
Write out results.
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void edm::StreamSchedule::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 884 of file StreamSchedule.cc.

References allWorkers(), end_paths_, edm::TriggerReport::endPathSummaries, edm::TriggerReport::eventSummary, edm::fillPathSummary(), edm::fillWorkerSummary(), edm::EventSummary::totalEvents, totalEvents(), edm::EventSummary::totalEventsFailed, totalEventsFailed(), edm::EventSummary::totalEventsPassed, totalEventsPassed(), trig_paths_, edm::TriggerReport::trigPathSummaries, and edm::TriggerReport::workerSummaries.

884  {
885  rep.eventSummary.totalEvents += totalEvents();
886  rep.eventSummary.totalEventsPassed += totalEventsPassed();
887  rep.eventSummary.totalEventsFailed += totalEventsFailed();
888 
889  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
890  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
891  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
892  }
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
int totalEventsFailed() const
int totalEvents() const
int totalEventsPassed() const
static void fillPathSummary(Path const &path, PathSummary &sum)
rep
Definition: cuy.py:1189
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
void edm::StreamSchedule::initializeEarlyDelete ( ModuleRegistry modReg,
edm::ParameterSet const &  opts,
edm::ProductRegistry const &  preg,
bool  allowEarlyDelete 
)
private

Definition at line 248 of file StreamSchedule.cc.

References allWorkers(), MicroEventContent_cff::branch, edm::BranchDescription::branchName(), edm::maker::ModuleHolder::createOutputModuleCommunicator(), delta, earlyDeleteBranchToCount_, earlyDeleteHelpers_, earlyDeleteHelperToBranchIndicies_, end_paths_, edm::ModuleRegistry::forAllModuleHolders(), runEdmFileComparison::found, edm::pset::Registry::getMapped(), edm::InEvent, edm::pset::Registry::instance(), checklumidiff::l, gen::n, AlCaHLTBitMon_ParallelJobs::p, muonDTDigis_cfi::pset, resetEarlyDelete(), AlCaHLTBitMon_QueryRunRegistry::string, trig_paths_, mitigatedMETSequence_cff::U, and w.

Referenced by StreamSchedule().

250  {
251  //for now, if have a subProcess, don't allow early delete
252  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
253  if(not allowEarlyDelete) return;
254 
255  //see if 'canDeleteEarly' was set and if so setup the list with those products actually
256  // registered for this job
257  std::multimap<std::string,Worker*> branchToReadingWorker;
258  initializeBranchToReadingWorker(opts,preg,branchToReadingWorker);
259 
260  //If no delete early items have been specified we don't have to do anything
261  if(branchToReadingWorker.empty()) {
262  return;
263  }
264  const std::vector<std::string> kEmpty;
265  std::map<Worker*,unsigned int> reserveSizeForWorker;
266  unsigned int upperLimitOnReadingWorker =0;
267  unsigned int upperLimitOnIndicies = 0;
268  unsigned int nUniqueBranchesToDelete=branchToReadingWorker.size();
269 
270  //talk with output modules first
271  modReg.forAllModuleHolders([&branchToReadingWorker,&nUniqueBranchesToDelete](maker::ModuleHolder* iHolder){
272  auto comm = iHolder->createOutputModuleCommunicator();
273  if (comm) {
274  if(!branchToReadingWorker.empty()) {
275  //If an OutputModule needs a product, we can't delete it early
276  // so we should remove it from our list
277  SelectedProductsForBranchType const& kept = comm->keptProducts();
278  for(auto const& item: kept[InEvent]) {
279  BranchDescription const& desc = *item.first;
280  auto found = branchToReadingWorker.equal_range(desc.branchName());
281  if(found.first !=found.second) {
282  --nUniqueBranchesToDelete;
283  branchToReadingWorker.erase(found.first,found.second);
284  }
285  }
286  }
287  }
288  });
289 
290  if(branchToReadingWorker.empty()) {
291  return;
292  }
293 
294  for (auto w :allWorkers()) {
295  //determine if this module could read a branch we want to delete early
296  auto pset = pset::Registry::instance()->getMapped(w->description().parameterSetID());
297  if(nullptr!=pset) {
298  auto branches = pset->getUntrackedParameter<std::vector<std::string>>("mightGet",kEmpty);
299  if(not branches.empty()) {
300  ++upperLimitOnReadingWorker;
301  }
302  for(auto const& branch:branches){
303  auto found = branchToReadingWorker.equal_range(branch);
304  if(found.first != found.second) {
305  ++upperLimitOnIndicies;
306  ++reserveSizeForWorker[w];
307  if(nullptr == found.first->second) {
308  found.first->second = w;
309  } else {
310  branchToReadingWorker.insert(make_pair(found.first->first,w));
311  }
312  }
313  }
314  }
315  }
316  {
317  auto it = branchToReadingWorker.begin();
318  std::vector<std::string> unusedBranches;
319  while(it !=branchToReadingWorker.end()) {
320  if(it->second == nullptr) {
321  unusedBranches.push_back(it->first);
322  //erasing the object invalidates the iterator so must advance it first
323  auto temp = it;
324  ++it;
325  branchToReadingWorker.erase(temp);
326  } else {
327  ++it;
328  }
329  }
330  if(not unusedBranches.empty()) {
331  LogWarning l("UnusedProductsForCanDeleteEarly");
332  l<<"The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
333  " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
334  for(auto const& n:unusedBranches){
335  l<<"\n "<<n;
336  }
337  }
338  }
339  if(!branchToReadingWorker.empty()) {
340  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
341  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies,0);
342  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
343  std::map<const Worker*,EarlyDeleteHelper*> alreadySeenWorkers;
344  std::string lastBranchName;
345  size_t nextOpenIndex = 0;
346  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
347  for(auto& branchAndWorker:branchToReadingWorker) {
348  if(lastBranchName != branchAndWorker.first) {
349  //have to put back the period we removed earlier in order to get the proper name
350  BranchID bid(branchAndWorker.first+".");
351  earlyDeleteBranchToCount_.emplace_back(bid,0U);
352  lastBranchName = branchAndWorker.first;
353  }
354  auto found = alreadySeenWorkers.find(branchAndWorker.second);
355  if(alreadySeenWorkers.end() == found) {
356  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
357  // all the branches that might be read by this worker. However, initially we will only tell the
358  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
359  // EarlyDeleteHelper will automatically advance its internal end pointer.
360  size_t index = nextOpenIndex;
361  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
363  earlyDeleteHelpers_.emplace_back(beginAddress+index,
364  beginAddress+index+1,
366  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
367  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second,&(earlyDeleteHelpers_.back())));
368  nextOpenIndex +=nIndices;
369  } else {
370  found->second->appendIndex(earlyDeleteBranchToCount_.size()-1);
371  }
372  }
373 
374  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
375  // space needed for each module
376  auto itLast = earlyDeleteHelpers_.begin();
377  for(auto it = earlyDeleteHelpers_.begin()+1;it != earlyDeleteHelpers_.end();++it) {
378  if(itLast->end() != it->begin()) {
379  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
380  unsigned int delta = it->begin()- itLast->end();
381  it->shiftIndexPointers(delta);
382 
384  (itLast->end()-beginAddress),
386  (it->begin()-beginAddress));
387  }
388  itLast = it;
389  }
390  earlyDeleteHelperToBranchIndicies_.erase(earlyDeleteHelperToBranchIndicies_.begin()+(itLast->end()-beginAddress),
392 
393  //now tell the paths about the deleters
394  for(auto& p : trig_paths_) {
395  p.setEarlyDeleteHelpers(alreadySeenWorkers);
396  }
397  for(auto& p : end_paths_) {
398  p.setEarlyDeleteHelpers(alreadySeenWorkers);
399  }
401  }
402  }
dbl * delta
Definition: mlp_gen.cc:36
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
const double w
Definition: UKUtility.cc:23
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
std::vector< BranchToCount > earlyDeleteBranchToCount_
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:19
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
static Registry * instance()
Definition: Registry.cc:13
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void edm::StreamSchedule::makePathStatusInserters ( std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
ExceptionToActionTable const &  actions 
)
private

Definition at line 930 of file StreamSchedule.cc.

References actions, actReg_, addToAllWorkers(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, edm::get_underlying(), pathStatusInserterWorkers_, and trig_paths_.

Referenced by StreamSchedule().

933  {
934 
935  int bitpos = 0;
936  unsigned int indexEmpty = 0;
937  unsigned int indexOfPath = 0;
938  for(auto & pathStatusInserter : pathStatusInserters) {
939  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
941  inserterPtr->moduleDescription(),
942  &actions));
943  pathStatusInserterWorkers_.emplace_back(workerPtr);
944  workerPtr->setActivityRegistry(actReg_);
945  addToAllWorkers(workerPtr.get());
946 
947  // A little complexity here because a C++ Path object is not
948  // instantiated and put into end_paths if there are no modules
949  // on the configured path.
950  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
951  ++indexEmpty;
952  } else {
953  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(),
954  workerPtr.get());
955  ++indexOfPath;
956  }
957  ++bitpos;
958  }
959 
960  bitpos = 0;
961  indexEmpty = 0;
962  indexOfPath = 0;
963  for(auto & endPathStatusInserter : endPathStatusInserters) {
964  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
966  inserterPtr->moduleDescription(),
967  &actions));
968  endPathStatusInserterWorkers_.emplace_back(workerPtr);
969  workerPtr->setActivityRegistry(actReg_);
970  addToAllWorkers(workerPtr.get());
971 
972  // A little complexity here because a C++ Path object is not
973  // instantiated and put into end_paths if there are no modules
974  // on the configured path.
975  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
976  ++indexEmpty;
977  } else {
978  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr,
979  workerPtr.get());
980  ++indexOfPath;
981  }
982  ++bitpos;
983  }
984  }
std::vector< int > empty_trig_paths_
roAction_t actions[nactions]
Definition: GenABIO.cc:187
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void addToAllWorkers(Worker *w)
std::shared_ptr< Worker > WorkerPtr
std::shared_ptr< ActivityRegistry > actReg_
std::vector< int > empty_end_paths_
T & get_underlying(propagate_const< T > &)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void edm::StreamSchedule::moduleDescriptionsInEndPath ( std::string const &  iEndPathLabel,
std::vector< ModuleDescription const * > &  descriptions,
unsigned int  hint 
) const

Definition at line 795 of file StreamSchedule.cc.

References end_paths_, runEdmFileComparison::found, mps_fire::i, and edm::Path::name().

797  {
798  descriptions.clear();
799  bool found = false;
800  TrigPaths::const_iterator itFound;
801 
802  if(hint < end_paths_.size()) {
803  itFound = end_paths_.begin() + hint;
804  if(itFound->name() == iEndPathLabel) found = true;
805  }
806  if(!found) {
807  // if the hint did not work, do it the slow way
808  itFound = std::find_if (end_paths_.begin(),
809  end_paths_.end(),
810  std::bind(std::equal_to<std::string>(),
811  iEndPathLabel,
812  std::bind(&Path::name, std::placeholders::_1)));
813  if (itFound != end_paths_.end()) found = true;
814  }
815  if (found) {
816  descriptions.reserve(itFound->size());
817  for (size_t i = 0; i < itFound->size(); ++i) {
818  descriptions.push_back(itFound->getWorker(i)->descPtr());
819  }
820  }
821  }
std::string const & name() const
Definition: Path.h:75
void edm::StreamSchedule::moduleDescriptionsInPath ( std::string const &  iPathLabel,
std::vector< ModuleDescription const * > &  descriptions,
unsigned int  hint 
) const

Definition at line 766 of file StreamSchedule.cc.

References runEdmFileComparison::found, mps_fire::i, edm::Path::name(), and trig_paths_.

768  {
769  descriptions.clear();
770  bool found = false;
771  TrigPaths::const_iterator itFound;
772 
773  if(hint < trig_paths_.size()) {
774  itFound = trig_paths_.begin() + hint;
775  if(itFound->name() == iPathLabel) found = true;
776  }
777  if(!found) {
778  // if the hint did not work, do it the slow way
779  itFound = std::find_if (trig_paths_.begin(),
780  trig_paths_.end(),
781  std::bind(std::equal_to<std::string>(),
782  iPathLabel,
783  std::bind(&Path::name, std::placeholders::_1)));
784  if (itFound != trig_paths_.end()) found = true;
785  }
786  if (found) {
787  descriptions.reserve(itFound->size());
788  for (size_t i = 0; i < itFound->size(); ++i) {
789  descriptions.push_back(itFound->getWorker(i)->descPtr());
790  }
791  }
792  }
std::string const & name() const
Definition: Path.h:75
void edm::StreamSchedule::modulesInPath ( std::string const &  iPathLabel,
std::vector< std::string > &  oLabelsToFill 
) const

adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel

Definition at line 749 of file StreamSchedule.cc.

References mps_fire::i, edm::Path::name(), and trig_paths_.

750  {
751  TrigPaths::const_iterator itFound =
752  std::find_if (trig_paths_.begin(),
753  trig_paths_.end(),
754  std::bind(std::equal_to<std::string>(),
755  iPathLabel,
756  std::bind(&Path::name, std::placeholders::_1)));
757  if (itFound!=trig_paths_.end()) {
758  oLabelsToFill.reserve(itFound->size());
759  for (size_t i = 0; i < itFound->size(); ++i) {
760  oLabelsToFill.push_back(itFound->getWorker(i)->description().moduleLabel());
761  }
762  }
763  }
std::string const & name() const
Definition: Path.h:75
unsigned int edm::StreamSchedule::numberOfUnscheduledModules ( ) const
inline

Definition at line 262 of file StreamSchedule.h.

262  {
264  }
unsigned int number_of_unscheduled_modules_
void edm::StreamSchedule::processOneEventAsync ( WaitingTaskHolder  iTask,
EventPrincipal ep,
EventSetup const &  es,
ServiceToken const &  token,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters 
)

Definition at line 538 of file StreamSchedule.cc.

References actReg_, edm::WaitingTaskHolder::doneWaiting(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, finishedPaths(), finishProcessOneEvent(), edm::make_waiting_task(), eostools::move(), cmsPerfStripChart::operate(), edm::hlt::Pass, pathStatusInserterWorkers_, edm::WorkerManager::processAccumulatorsAsync(), resetAll(), results_, edm::WorkerManager::setupOnDemandSystem(), streamContext_, streamID_, total_events_, trig_paths_, and workerManager_.

542  {
543  try {
544  this->resetAll();
545 
546  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
547 
548  Traits::setStreamContext(streamContext_, ep);
549  //a service may want to communicate with another service
550  ServiceRegistry::Operate guard(serviceToken);
551  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
552 
553  HLTPathStatus hltPathStatus(hlt::Pass, 0);
554  for (int empty_trig_path : empty_trig_paths_) {
555  results_->at(empty_trig_path) = hltPathStatus;
556  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
557  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
558  ep, es, streamID_, ParentContext(&streamContext_), &streamContext_
559  );
560  if (except) {
561  iTask.doneWaiting(except);
562  return;
563  }
564  }
565  for (int empty_end_path : empty_end_paths_) {
566  std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
567  ep, es, streamID_, ParentContext(&streamContext_), &streamContext_
568  );
569  if (except) {
570  iTask.doneWaiting(except);
571  return;
572  }
573  }
574 
575  // This call takes care of the unscheduled processing.
577 
578  ++total_events_;
579 
580  //use to give priorities on an error to ones from Paths
581  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
582  auto pathErrorPtr = pathErrorHolder.get();
583  auto allPathsDone = make_waiting_task(tbb::task::allocate_root(),
584  [iTask,this,serviceToken,pathError=std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable
585  {
586  ServiceRegistry::Operate operate(serviceToken);
587 
588  std::exception_ptr ptr;
589  if(pathError->load()) {
590  ptr = *pathError->load();
591  delete pathError->load();
592  }
593  if( (not ptr) and iPtr) {
594  ptr = *iPtr;
595  }
596  iTask.doneWaiting(finishProcessOneEvent(ptr));
597  });
598  //The holder guarantees that if the paths finish before the loop ends
599  // that we do not start too soon. It also guarantees that the task will
600  // run under that condition.
601  WaitingTaskHolder allPathsHolder(allPathsDone);
602 
603  auto pathsDone = make_waiting_task(tbb::task::allocate_root(),
604  [allPathsHolder,pathErrorPtr,&ep, &es, this,serviceToken](std::exception_ptr const* iPtr) mutable
605  {
606  ServiceRegistry::Operate operate(serviceToken);
607 
608  if(iPtr) {
609  //this is used to prioritize this error over one
610  // that happens in EndPath or Accumulate
611  pathErrorPtr->store( new std::exception_ptr(*iPtr) );
612  }
613  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), ep, es);
614  });
615 
616  //The holder guarantees that if the paths finish before the loop ends
617  // that we do not start too soon. It also guarantees that the task will
618  // run under that condition.
619  WaitingTaskHolder taskHolder(pathsDone);
620 
621  //start end paths first so on single threaded the paths will run first
622  for(auto it = end_paths_.rbegin(), itEnd = end_paths_.rend();
623  it != itEnd; ++it) {
624  it->processOneOccurrenceAsync(allPathsDone,ep, es, serviceToken, streamID_, &streamContext_);
625  }
626 
627  for(auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend();
628  it != itEnd; ++ it) {
629  it->processOneOccurrenceAsync(pathsDone,ep, es, serviceToken, streamID_, &streamContext_);
630  }
631 
632  ParentContext parentContext(&streamContext_);
633  workerManager_.processAccumulatorsAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
634  allPathsDone, ep, es, serviceToken, streamID_, parentContext, &streamContext_);
635  }catch (...) {
636  iTask.doneWaiting(std::current_exception());
637  }
638  }
std::vector< int > empty_trig_paths_
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
std::vector< int > empty_end_paths_
accept
Definition: HLTenums.h:19
StreamContext streamContext_
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
edm::propagate_const< TrigResPtr > results_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
void finishedPaths(std::atomic< std::exception_ptr * > &, WaitingTaskHolder, EventPrincipal &ep, EventSetup const &es)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void setupOnDemandSystem(Principal &principal, EventSetup const &es)
void processAccumulatorsAsync(WaitingTask *task, typename T::MyPrincipal const &ep, EventSetup const &es, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
def move(src, dest)
Definition: eostools.py:510
def operate(timelog, memlog, json_f, num)
template<typename T >
void edm::StreamSchedule::processOneStreamAsync ( WaitingTaskHolder  iTask,
typename T::MyPrincipal &  principal,
EventSetup const &  eventSetup,
ServiceToken const &  token,
bool  cleaningUpAfterException = false 
)

Definition at line 386 of file StreamSchedule.h.

References edm::addContextAndPrintException(), cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), edm::ExceptionFromThisContext, h, triggerObjects_cff::id, edm::make_functor_task(), edm::make_waiting_task(), AlCaHLTBitMon_ParallelJobs::p, and edm::convertException::wrap().

390  {
391  T::setStreamContext(streamContext_, ep);
392 
393  auto id = ep.id();
394  auto doneTask = make_waiting_task(tbb::task::allocate_root(),
395  [this,iHolder, id,cleaningUpAfterException,token](std::exception_ptr const* iPtr) mutable
396  {
397  std::exception_ptr excpt;
398  if(iPtr) {
399  excpt = *iPtr;
400  //add context information to the exception and print message
401  try {
402  convertException::wrap([&]() {
403  std::rethrow_exception(excpt);
404  });
405  } catch(cms::Exception& ex) {
406  //TODO: should add the transition type info
407  std::ostringstream ost;
408  if(ex.context().empty()) {
409  ost<<"Processing "<<T::transitionName()<<" "<<id;
410  }
411  ServiceRegistry::Operate op(token);
412  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
413  excpt = std::current_exception();
414  }
415 
416  ServiceRegistry::Operate op(token);
417  actReg_->preStreamEarlyTerminationSignal_(streamContext_,TerminationOrigin::ExceptionFromThisContext);
418  }
419 
420  try {
421  ServiceRegistry::Operate op(token);
422  T::postScheduleSignal(actReg_.get(), &streamContext_);
423  } catch(...) {
424  if(not excpt) {
425  excpt = std::current_exception();
426  }
427  }
428  iHolder.doneWaiting(excpt);
429 
430  });
431 
432  auto task = make_functor_task(tbb::task::allocate_root(), [this,doneTask, h =WaitingTaskHolder(doneTask) ,&ep,&es,token] () mutable {
433  ServiceRegistry::Operate op(token);
434  try {
435  T::preScheduleSignal(actReg_.get(), &streamContext_);
436 
438  }catch(...) {
439  h.doneWaiting(std::current_exception());
440  return;
441  }
442 
443  for(auto& p : end_paths_) {
444  p.runAllModulesAsync<T>(doneTask, ep, es, token, streamID_, &streamContext_);
445  }
446 
447  for(auto& p : trig_paths_) {
448  p.runAllModulesAsync<T>(doneTask, ep, es, token, streamID_, &streamContext_);
449  }
450 
452  ep, es, token, streamID_, &streamContext_, &streamContext_);
453  });
454 
455  if(streamID_.value() == 0) {
456  //Enqueueing will start another thread if there is only
457  // one thread in the job. Having stream == 0 use spawn
458  // avoids starting up another thread when there is only one stream.
459  tbb::task::spawn( *task);
460  } else {
461  tbb::task::enqueue( *task);
462  }
463  }
void processOneOccurrenceAsync(WaitingTask *task, typename T::MyPrincipal &principal, EventSetup const &eventSetup, ServiceToken const &token, StreamID streamID, typename T::Context const *topContext, U const *context)
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
StreamContext streamContext_
std::list< std::string > const & context() const
Definition: Exception.cc:191
unsigned int value() const
Definition: StreamID.h:46
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
auto wrap(F iFunc) -> decltype(iFunc())
long double T
void edm::StreamSchedule::replaceModule ( maker::ModuleHolder iMod,
std::string const &  iLabel 
)

clone the type of module with label iLabel but configure with iPSet.

Definition at line 509 of file StreamSchedule.cc.

References allWorkers(), edm::Worker::beginStream(), runEdmFileComparison::found, edm::maker::ModuleHolder::replaceModuleFor(), streamContext_, and streamID_.

510  {
511  Worker* found = nullptr;
512  for (auto const& worker : allWorkers()) {
513  if (worker->description().moduleLabel() == iLabel) {
514  found = worker;
515  break;
516  }
517  }
518  if (nullptr == found) {
519  return;
520  }
521 
522  iMod->replaceModuleFor(found);
523  found->beginStream(streamID_,streamContext_);
524  }
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
StreamContext streamContext_
void edm::StreamSchedule::reportSkipped ( EventPrincipal const &  ep) const
inlineprivate

Definition at line 380 of file StreamSchedule.h.

References edm::EventID::event(), edm::EventPrincipal::id(), and edm::EventID::run().

380  {
381  Service<JobReport> reportSvc;
382  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
383  }
void edm::StreamSchedule::resetAll ( )
private

Definition at line 904 of file StreamSchedule.cc.

References results_, and skippingEvent_.

Referenced by processOneEventAsync().

904  {
905  skippingEvent_ = false;
906  results_->reset();
907  }
edm::propagate_const< TrigResPtr > results_
std::atomic< bool > skippingEvent_
void edm::StreamSchedule::resetEarlyDelete ( )
private

Definition at line 915 of file StreamSchedule.cc.

References KineDebug3::count(), earlyDeleteBranchToCount_, earlyDeleteHelpers_, and earlyDeleteHelperToBranchIndicies_.

Referenced by finishProcessOneEvent(), and initializeEarlyDelete().

915  {
916  //must be sure we have cleared the count first
917  for(auto& count:earlyDeleteBranchToCount_) {
918  count.count = 0;
919  }
920  //now reset based on how many helpers use that branch
922  ++(earlyDeleteBranchToCount_[index].count);
923  }
924  for(auto& helper: earlyDeleteHelpers_) {
925  helper.reset();
926  }
927  }
Definition: helper.py:1
std::vector< BranchToCount > earlyDeleteBranchToCount_
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
TrigResConstPtr edm::StreamSchedule::results ( ) const
inlineprivate

Definition at line 331 of file StreamSchedule.h.

References edm::get_underlying_safe().

Referenced by StreamSchedule().

331 {return get_underlying_safe(results_);}
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_
TrigResPtr& edm::StreamSchedule::results ( )
inlineprivate

Definition at line 332 of file StreamSchedule.h.

References actions, and edm::get_underlying_safe().

332 {return get_underlying_safe(results_);}
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_
StreamID edm::StreamSchedule::streamID ( ) const
inline

Definition at line 195 of file StreamSchedule.h.

References AlCaHLTBitMon_QueryRunRegistry::string.

195 { return streamID_; }
int edm::StreamSchedule::totalEvents ( ) const
inline

Return the number of events this StreamSchedule has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 223 of file StreamSchedule.h.

Referenced by getTriggerReport().

223  {
224  return total_events_;
225  }
int edm::StreamSchedule::totalEventsFailed ( ) const
inline

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 235 of file StreamSchedule.h.

References cuy::rep, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by getTriggerReport().

235  {
236  return totalEvents() - totalEventsPassed();
237  }
int totalEvents() const
int totalEventsPassed() const
int edm::StreamSchedule::totalEventsPassed ( ) const
inline

Return the number of events which have been passed by one or more trigger paths.

Definition at line 229 of file StreamSchedule.h.

Referenced by getTriggerReport().

229  {
230  return total_passed_;
231  }

Member Data Documentation

std::shared_ptr<ActivityRegistry> edm::StreamSchedule::actReg_
private
std::vector<BranchToCount> edm::StreamSchedule::earlyDeleteBranchToCount_
private

Definition at line 356 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

std::vector<EarlyDeleteHelper> edm::StreamSchedule::earlyDeleteHelpers_
private

Definition at line 366 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

std::vector<unsigned int> edm::StreamSchedule::earlyDeleteHelperToBranchIndicies_
private

Definition at line 363 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

std::vector<int> edm::StreamSchedule::empty_end_paths_
private

Definition at line 351 of file StreamSchedule.h.

Referenced by fillEndPath(), makePathStatusInserters(), and processOneEventAsync().

std::vector<int> edm::StreamSchedule::empty_trig_paths_
private

Definition at line 350 of file StreamSchedule.h.

Referenced by fillTrigPath(), makePathStatusInserters(), and processOneEventAsync().

TrigPaths edm::StreamSchedule::end_paths_
private
volatile bool edm::StreamSchedule::endpathsAreActive_
private

Definition at line 374 of file StreamSchedule.h.

Referenced by enableEndPaths(), and endPathsEnabled().

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::endPathStatusInserterWorkers_
private

Definition at line 346 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

unsigned int edm::StreamSchedule::number_of_unscheduled_modules_
private

Definition at line 370 of file StreamSchedule.h.

Referenced by StreamSchedule().

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::pathStatusInserterWorkers_
private

Definition at line 345 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

edm::propagate_const<TrigResPtr> edm::StreamSchedule::results_
private

Definition at line 342 of file StreamSchedule.h.

Referenced by finishedPaths(), processOneEventAsync(), and resetAll().

edm::propagate_const<WorkerPtr> edm::StreamSchedule::results_inserter_
private

Definition at line 344 of file StreamSchedule.h.

Referenced by finishedPaths(), and StreamSchedule().

std::atomic<bool> edm::StreamSchedule::skippingEvent_
private

Definition at line 375 of file StreamSchedule.h.

Referenced by fillTrigPath(), and resetAll().

StreamContext edm::StreamSchedule::streamContext_
private
StreamID edm::StreamSchedule::streamID_
private
int edm::StreamSchedule::total_events_
private

Definition at line 368 of file StreamSchedule.h.

Referenced by clearCounters(), and processOneEventAsync().

int edm::StreamSchedule::total_passed_
private

Definition at line 369 of file StreamSchedule.h.

Referenced by clearCounters(), and finishedPaths().

TrigPaths edm::StreamSchedule::trig_paths_
private
WorkerManager edm::StreamSchedule::workerManager_
private