CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Private Member Functions | Private Attributes
edm::StreamSchedule Class Reference

#include <StreamSchedule.h>

Classes

class  SendTerminationSignalIfException
 

Public Types

typedef std::vector< Worker * > AllWorkers
 
typedef std::vector< PathNonTrigPaths
 
typedef std::vector< WorkerInPathPathWorkers
 
typedef std::vector< PathTrigPaths
 
typedef std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
 
typedef std::shared_ptr< HLTGlobalStatusTrigResPtr
 
typedef std::vector< std::string > vstring
 
typedef std::shared_ptr< WorkerWorkerPtr
 
typedef std::vector< Worker * > Workers
 

Public Member Functions

AllWorkers const & allWorkers () const
 returns the collection of pointers to workers More...
 
void availablePaths (std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill the labels for all paths in the process More...
 
void beginStream ()
 
void clearCounters ()
 Clear all the counters in the trigger report. More...
 
StreamContext const & context () const
 
void enableEndPaths (bool active)
 
bool endPathsEnabled () const
 
void endStream ()
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
void getTriggerReport (TriggerReport &rep) const
 
void moduleDescriptionsInEndPath (std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
 
void moduleDescriptionsInPath (std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
 
void modulesInPath (std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel More...
 
unsigned int numberOfUnscheduledModules () const
 
void processOneEventAsync (WaitingTaskHolder iTask, EventPrincipal &ep, EventSetupImpl const &es, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
 
template<typename T >
void processOneStreamAsync (WaitingTaskHolder iTask, typename T::MyPrincipal &principal, EventSetupImpl const &eventSetup, ServiceToken const &token, bool cleaningUpAfterException=false)
 
void replaceModule (maker::ModuleHolder *iMod, std::string const &iLabel)
 clone the type of module with label iLabel but configure with iPSet. More...
 
StreamID streamID () const
 
 StreamSchedule (std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
 
 StreamSchedule (StreamSchedule const &)=delete
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 

Private Member Functions

ExceptionToActionTable const & actionTable () const
 returns the action table More...
 
void addToAllWorkers (Worker *w)
 
void fillEndPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
 
void fillTrigPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
 
void fillWorkers (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
 
void finishedPaths (std::atomic< std::exception_ptr * > &, WaitingTaskHolder, EventPrincipal &ep, EventSetupImpl const &es)
 
std::exception_ptr finishProcessOneEvent (std::exception_ptr)
 
void initializeEarlyDelete (ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
 
void makePathStatusInserters (std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
 
void reportSkipped (EventPrincipal const &ep) const
 
void resetAll ()
 
void resetEarlyDelete ()
 
TrigResConstPtr results () const
 
TrigResPtrresults ()
 

Private Attributes

std::shared_ptr< ActivityRegistryactReg_
 
std::vector< BranchToCountearlyDeleteBranchToCount_
 
std::vector< EarlyDeleteHelperearlyDeleteHelpers_
 
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
 
std::vector< int > empty_end_paths_
 
std::vector< int > empty_trig_paths_
 
TrigPaths end_paths_
 
volatile bool endpathsAreActive_
 
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
 
unsigned int number_of_unscheduled_modules_
 
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
 
edm::propagate_const< TrigResPtrresults_
 
edm::propagate_const< WorkerPtrresults_inserter_
 
std::atomic< bool > skippingEvent_
 
StreamContext streamContext_
 
StreamID streamID_
 
int total_events_
 
int total_passed_
 
TrigPaths trig_paths_
 
WorkerManager workerManager_
 

Detailed Description

Definition at line 150 of file StreamSchedule.h.

Member Typedef Documentation

typedef std::vector<Worker*> edm::StreamSchedule::AllWorkers

Definition at line 158 of file StreamSchedule.h.

Definition at line 154 of file StreamSchedule.h.

Definition at line 162 of file StreamSchedule.h.

typedef std::vector<Path> edm::StreamSchedule::TrigPaths

Definition at line 153 of file StreamSchedule.h.

typedef std::shared_ptr<HLTGlobalStatus const> edm::StreamSchedule::TrigResConstPtr

Definition at line 156 of file StreamSchedule.h.

Definition at line 155 of file StreamSchedule.h.

typedef std::vector<std::string> edm::StreamSchedule::vstring

Definition at line 152 of file StreamSchedule.h.

typedef std::shared_ptr<Worker> edm::StreamSchedule::WorkerPtr

Definition at line 157 of file StreamSchedule.h.

typedef std::vector<Worker*> edm::StreamSchedule::Workers

Definition at line 160 of file StreamSchedule.h.

Constructor & Destructor Documentation

edm::StreamSchedule::StreamSchedule ( std::shared_ptr< TriggerResultInserter inserter,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
std::shared_ptr< ModuleRegistry modReg,
ParameterSet proc_pset,
service::TriggerNamesService const &  tns,
PreallocationConfiguration const &  prealloc,
ProductRegistry pregistry,
BranchIDListHelper branchIDListHelper,
ExceptionToActionTable const &  actions,
std::shared_ptr< ActivityRegistry areg,
std::shared_ptr< ProcessConfiguration processConfiguration,
bool  allowEarlyDelete,
StreamID  streamID,
ProcessContext const *  processContext 
)

Definition at line 137 of file StreamSchedule.cc.

References actReg_, addToAllWorkers(), edm::WorkerManager::addToUnscheduledWorkers(), allWorkers(), end_paths_, fillEndPath(), fillTrigPath(), edm::propagate_const< T >::get(), edm::service::TriggerNamesService::getEndPaths(), edm::ParameterSet::getParameter(), edm::ParameterSet::getPSetForUpdate(), edm::service::TriggerNamesService::getTrigPaths(), edm::ParameterSet::getUntrackedParameterSet(), initializeEarlyDelete(), label, makePathStatusInserters(), number_of_unscheduled_modules_, results(), results_inserter_, trig_paths_, edm::StreamID::value(), and workerManager_.

153  : workerManager_(modReg, areg, actions),
154  actReg_(areg),
155  results_(new HLTGlobalStatus(tns.getTrigPaths().size())),
157  trig_paths_(),
158  end_paths_(),
159  total_events_(),
160  total_passed_(),
163  streamContext_(streamID_, processContext),
164  endpathsAreActive_(true),
165  skippingEvent_(false) {
166  ParameterSet const& opts = proc_pset.getUntrackedParameterSet("options", ParameterSet());
167  bool hasPath = false;
168  std::vector<std::string> const& pathNames = tns.getTrigPaths();
169  std::vector<std::string> const& endPathNames = tns.getEndPaths();
170 
171  int trig_bitpos = 0;
172  trig_paths_.reserve(pathNames.size());
173  for (auto const& trig_name : pathNames) {
174  fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name, results(), endPathNames);
175  ++trig_bitpos;
176  hasPath = true;
177  }
178 
179  if (hasPath) {
180  // the results inserter stands alone
181  inserter->setTrigResultForStream(streamID.value(), results());
182 
183  results_inserter_ = makeInserter(actions, actReg_, inserter);
185  }
186 
187  // fill normal endpaths
188  int bitpos = 0;
189  end_paths_.reserve(endPathNames.size());
190  for (auto const& end_path_name : endPathNames) {
191  fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name, endPathNames);
192  ++bitpos;
193  }
194 
195  makePathStatusInserters(pathStatusInserters, endPathStatusInserters, actions);
196 
197  //See if all modules were used
198  std::set<std::string> usedWorkerLabels;
199  for (auto const& worker : allWorkers()) {
200  usedWorkerLabels.insert(worker->description().moduleLabel());
201  }
202  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string>>("@all_modules"));
203  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
204  std::vector<std::string> unusedLabels;
205  set_difference(modulesInConfigSet.begin(),
206  modulesInConfigSet.end(),
207  usedWorkerLabels.begin(),
208  usedWorkerLabels.end(),
209  back_inserter(unusedLabels));
210  std::set<std::string> unscheduledLabels;
211  std::vector<std::string> shouldBeUsedLabels;
212  if (!unusedLabels.empty()) {
213  //Need to
214  // 1) create worker
215  // 2) if it is a WorkerT<EDProducer>, add it to our list
216  // 3) hand list to our delayed reader
217  for (auto const& label : unusedLabels) {
218  bool isTracked;
219  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
220  assert(isTracked);
221  assert(modulePSet != nullptr);
223  *modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
224  }
225  if (!shouldBeUsedLabels.empty()) {
226  std::ostringstream unusedStream;
227  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
228  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
229  itLabelEnd = shouldBeUsedLabels.end();
230  itLabel != itLabelEnd;
231  ++itLabel) {
232  unusedStream << ",'" << *itLabel << "'";
233  }
234  LogInfo("path") << "The following module labels are not assigned to any path:\n" << unusedStream.str() << "\n";
235  }
236  }
237  number_of_unscheduled_modules_ = unscheduledLabels.size();
238 
239  initializeEarlyDelete(*modReg, opts, preg, allowEarlyDelete);
240 
241  } // StreamSchedule::StreamSchedule
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
roAction_t actions[nactions]
Definition: GenABIO.cc:181
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
volatile bool endpathsAreActive_
void addToAllWorkers(Worker *w)
edm::propagate_const< WorkerPtr > results_inserter_
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
char const * label
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
StreamID streamID() const
StreamContext streamContext_
element_type const * get() const
unsigned int value() const
Definition: StreamID.h:42
edm::propagate_const< TrigResPtr > results_
std::atomic< bool > skippingEvent_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
TrigResConstPtr results() const
edm::StreamSchedule::StreamSchedule ( StreamSchedule const &  )
delete

Member Function Documentation

ExceptionToActionTable const& edm::StreamSchedule::actionTable ( ) const
inlineprivate

returns the action table

Definition at line 283 of file StreamSchedule.h.

References dataset::name, geometryDiff::opts, MillePedeFileConverter_cfg::out, AlCaHLTBitMon_QueryRunRegistry::string, and w.

Referenced by fillEndPath(), fillTrigPath(), and finishedPaths().

283 { return workerManager_.actionTable(); }
WorkerManager workerManager_
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:89
void edm::StreamSchedule::addToAllWorkers ( Worker w)
private

Definition at line 887 of file StreamSchedule.cc.

References edm::WorkerManager::addToAllWorkers(), and workerManager_.

Referenced by fillEndPath(), fillTrigPath(), makePathStatusInserters(), and StreamSchedule().

const double w
Definition: UKUtility.cc:23
WorkerManager workerManager_
void addToAllWorkers(Worker *w)
AllWorkers const& edm::StreamSchedule::allWorkers ( ) const
inline

returns the collection of pointers to workers

Definition at line 255 of file StreamSchedule.h.

Referenced by clearCounters(), getAllModuleDescriptions(), getTriggerReport(), initializeEarlyDelete(), replaceModule(), and StreamSchedule().

255 { return workerManager_.allWorkers(); }
WorkerManager workerManager_
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:85
void edm::StreamSchedule::availablePaths ( std::vector< std::string > &  oLabelsToFill) const

adds to oLabelsToFill the labels for all paths in the process

Definition at line 739 of file StreamSchedule.cc.

References edm::Path::name(), create_public_lumi_plots::transform, and trig_paths_.

739  {
740  oLabelsToFill.reserve(trig_paths_.size());
741  std::transform(trig_paths_.begin(),
742  trig_paths_.end(),
743  std::back_inserter(oLabelsToFill),
744  std::bind(&Path::name, std::placeholders::_1));
745  }
std::string const & name() const
Definition: Path.h:80
void edm::StreamSchedule::beginStream ( )

Definition at line 517 of file StreamSchedule.cc.

References edm::WorkerManager::beginStream(), streamContext_, streamID_, and workerManager_.

WorkerManager workerManager_
StreamContext streamContext_
void beginStream(StreamID iID, StreamContext &streamContext)
void edm::StreamSchedule::clearCounters ( )

Clear all the counters in the trigger report.

Definition at line 874 of file StreamSchedule.cc.

References allWorkers(), edm::Path::clearCounters(), edm::Worker::clearCounters(), end_paths_, edm::for_all(), total_events_, total_passed_, and trig_paths_.

874  {
875  using std::placeholders::_1;
877  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
878  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
879  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
880  }
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void clearCounters()
Definition: Worker.h:217
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
void clearCounters()
Definition: Path.cc:164
StreamContext const& edm::StreamSchedule::context ( ) const
inline

Definition at line 259 of file StreamSchedule.h.

259 { return streamContext_; }
StreamContext streamContext_
void edm::StreamSchedule::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 818 of file StreamSchedule.cc.

References endpathsAreActive_.

818 { endpathsAreActive_ = active; }
volatile bool endpathsAreActive_
bool edm::StreamSchedule::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 820 of file StreamSchedule.cc.

References endpathsAreActive_.

820 { return endpathsAreActive_; }
volatile bool endpathsAreActive_
void edm::StreamSchedule::endStream ( )

Definition at line 519 of file StreamSchedule.cc.

References edm::WorkerManager::endStream(), streamContext_, streamID_, and workerManager_.

void endStream(StreamID iID, StreamContext &streamContext)
WorkerManager workerManager_
StreamContext streamContext_
void edm::StreamSchedule::fillEndPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
std::vector< std::string > const &  endPathNames 
)
private

Definition at line 488 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_end_paths_, end_paths_, fillWorkers(), edm::PathContext::kEndPath, and streamContext_.

Referenced by StreamSchedule().

494  {
495  PathWorkers tmpworkers;
496  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, endPathNames);
497 
498  if (!tmpworkers.empty()) {
499  //EndPaths are not supposed to stop if SkipEvent type exception happens
500  end_paths_.emplace_back(bitpos,
501  name,
502  tmpworkers,
503  TrigResPtr(),
504  actionTable(),
505  actReg_,
507  nullptr,
509  } else {
510  empty_end_paths_.push_back(bitpos);
511  }
512  for (WorkerInPath const& workerInPath : tmpworkers) {
513  addToAllWorkers(workerInPath.getWorker());
514  }
515  }
std::shared_ptr< HLTGlobalStatus > TrigResPtr
void addToAllWorkers(Worker *w)
ExceptionToActionTable const & actionTable() const
returns the action table
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
StreamContext streamContext_
void edm::StreamSchedule::fillTrigPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
TrigResPtr  trptr,
std::vector< std::string > const &  endPathNames 
)
private

Definition at line 458 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_trig_paths_, fillWorkers(), edm::PathContext::kPath, skippingEvent_, streamContext_, and trig_paths_.

Referenced by StreamSchedule().

465  {
466  PathWorkers tmpworkers;
467  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, endPathNames);
468 
469  // an empty path will cause an extra bit that is not used
470  if (!tmpworkers.empty()) {
471  trig_paths_.emplace_back(bitpos,
472  name,
473  tmpworkers,
474  trptr,
475  actionTable(),
476  actReg_,
480  } else {
481  empty_trig_paths_.push_back(bitpos);
482  }
483  for (WorkerInPath const& workerInPath : tmpworkers) {
484  addToAllWorkers(workerInPath.getWorker());
485  }
486  }
std::vector< int > empty_trig_paths_
void addToAllWorkers(Worker *w)
ExceptionToActionTable const & actionTable() const
returns the action table
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
StreamContext streamContext_
std::atomic< bool > skippingEvent_
void edm::StreamSchedule::fillWorkers ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
std::string const &  name,
bool  ignoreFilters,
PathWorkers out,
std::vector< std::string > const &  endPathNames 
)
private

Definition at line 399 of file StreamSchedule.cc.

References edm::errors::Configuration, edm::Worker::description(), Exception, edm::ParameterSet::getParameter(), edm::ParameterSet::getPSetForUpdate(), edm::ParameterSet::getUntrackedParameter(), edm::WorkerManager::getWorker(), edm::WorkerInPath::Ignore, edm::Worker::kFilter, edm::ModuleDescription::moduleName(), edm::Worker::moduleType(), dataset::name, edm::WorkerInPath::Normal, edm::search_all(), AlCaHLTBitMon_QueryRunRegistry::string, edm::WorkerInPath::Veto, and workerManager_.

Referenced by fillEndPath(), and fillTrigPath().

406  {
407  vstring modnames = proc_pset.getParameter<vstring>(pathName);
408  PathWorkers tmpworkers;
409 
410  unsigned int placeInPath = 0;
411  for (auto const& name : modnames) {
413  if (name[0] == '!')
414  filterAction = WorkerInPath::Veto;
415  else if (name[0] == '-')
416  filterAction = WorkerInPath::Ignore;
417 
418  std::string moduleLabel = name;
419  if (filterAction != WorkerInPath::Normal)
420  moduleLabel.erase(0, 1);
421 
422  bool isTracked;
423  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
424  if (modpset == nullptr) {
425  std::string pathType("endpath");
426  if (!search_all(endPathNames, pathName)) {
427  pathType = std::string("path");
428  }
430  << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
431  << "\"\n please check spelling or remove that label from the path.";
432  }
433  assert(isTracked);
434 
435  Worker* worker = workerManager_.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
436  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
437  // We have a filter on an end path, and the filter is not explicitly ignored.
438  // See if the filter is allowed.
439  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
440  if (!search_all(allowed_filters, worker->description().moduleName())) {
441  // Filter is not allowed. Ignore the result, and issue a warning.
442  filterAction = WorkerInPath::Ignore;
443  LogWarning("FilterOnEndPath") << "The EDFilter '" << worker->description().moduleName()
444  << "' with module label '" << moduleLabel << "' appears on EndPath '"
445  << pathName << "'.\n"
446  << "The return value of the filter will be ignored.\n"
447  << "To suppress this warning, either remove the filter from the endpath,\n"
448  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
449  }
450  }
451  tmpworkers.emplace_back(worker, filterAction, placeInPath);
452  ++placeInPath;
453  }
454 
455  out.swap(tmpworkers);
456  }
vector< string > vstring
Definition: ExoticaDQM.cc:8
WorkerManager workerManager_
std::vector< WorkerInPath > PathWorkers
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:36
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
void edm::StreamSchedule::finishedPaths ( std::atomic< std::exception_ptr * > &  iExcept,
WaitingTaskHolder  iWait,
EventPrincipal ep,
EventSetupImpl const &  es 
)
private

Definition at line 649 of file StreamSchedule.cc.

References writedatasetfile::action, actionTable(), cms::Exception::addContext(), cms::Exception::category(), cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), MillePedeFileConverter_cfg::e, edm::exception_actions::FailPath, edm::ExceptionToActionTable::find(), edm::propagate_const< T >::get(), edm::EventPrincipal::id(), edm::exception_actions::IgnoreCompletely, edm::printCmsExceptionWarning(), results_, results_inserter_, edm::exception_actions::SkipEvent, streamContext_, streamID_, and total_passed_.

Referenced by processOneEventAsync().

652  {
653  if (iExcept) {
654  try {
655  std::rethrow_exception(*(iExcept.load()));
656  } catch (cms::Exception& e) {
658  assert(action != exception_actions::IgnoreCompletely);
659  assert(action != exception_actions::FailPath);
660  if (action == exception_actions::SkipEvent) {
661  edm::printCmsExceptionWarning("SkipEvent", e);
662  *(iExcept.load()) = std::exception_ptr();
663  } else {
664  *(iExcept.load()) = std::current_exception();
665  }
666  } catch (...) {
667  *(iExcept.load()) = std::current_exception();
668  }
669  }
670 
671  if ((not iExcept) and results_->accept()) {
672  ++total_passed_;
673  }
674 
675  if (nullptr != results_inserter_.get()) {
676  try {
677  //Even if there was an exception, we need to allow results inserter
678  // to run since some module may be waiting on its results.
679  ParentContext parentContext(&streamContext_);
680  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
681 
682  results_inserter_->doWork<Traits>(ep, es, streamID_, parentContext, &streamContext_);
683  } catch (cms::Exception& ex) {
684  if (not iExcept) {
685  if (ex.context().empty()) {
686  std::ostringstream ost;
687  ost << "Processing Event " << ep.id();
688  ex.addContext(ost.str());
689  }
690  iExcept.store(new std::exception_ptr(std::current_exception()));
691  }
692  } catch (...) {
693  if (not iExcept) {
694  iExcept.store(new std::exception_ptr(std::current_exception()));
695  }
696  }
697  }
698  std::exception_ptr ptr;
699  if (iExcept) {
700  ptr = *iExcept.load();
701  }
702  iWait.doneWaiting(ptr);
703  }
std::string const & category() const
Definition: Exception.cc:143
exception_actions::ActionCodes find(const std::string &category) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
StreamContext streamContext_
std::list< std::string > const & context() const
Definition: Exception.cc:147
element_type const * get() const
edm::propagate_const< TrigResPtr > results_
void addContext(std::string const &context)
Definition: Exception.cc:165
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
std::exception_ptr edm::StreamSchedule::finishProcessOneEvent ( std::exception_ptr  iExcept)
private

Definition at line 705 of file StreamSchedule.cc.

References actReg_, edm::addContextAndPrintException(), cms::Exception::context(), edm::ExceptionFromThisContext, resetEarlyDelete(), streamContext_, and edm::convertException::wrap().

Referenced by processOneEventAsync().

705  {
706  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
707 
708  if (iExcept) {
709  //add context information to the exception and print message
710  try {
711  convertException::wrap([&]() { std::rethrow_exception(iExcept); });
712  } catch (cms::Exception& ex) {
713  bool const cleaningUpAfterException = false;
714  if (ex.context().empty()) {
715  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
716  } else {
717  addContextAndPrintException("", ex, cleaningUpAfterException);
718  }
719  iExcept = std::current_exception();
720  }
721 
722  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
723  }
724 
725  try {
726  Traits::postScheduleSignal(actReg_.get(), &streamContext_);
727  } catch (...) {
728  if (not iExcept) {
729  iExcept = std::current_exception();
730  }
731  }
732  if (not iExcept) {
734  }
735 
736  return iExcept;
737  }
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::shared_ptr< ActivityRegistry > actReg_
StreamContext streamContext_
std::list< std::string > const & context() const
Definition: Exception.cc:147
auto wrap(F iFunc) -> decltype(iFunc())
std::vector< ModuleDescription const * > edm::StreamSchedule::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this StreamSchedule. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 537 of file StreamSchedule.cc.

References allWorkers(), AlCaHLTBitMon_ParallelJobs::p, mps_fire::result, and findQualityFiles::size.

537  {
538  std::vector<ModuleDescription const*> result;
539  result.reserve(allWorkers().size());
540 
541  for (auto const& worker : allWorkers()) {
542  ModuleDescription const* p = worker->descPtr();
543  result.push_back(p);
544  }
545  return result;
546  }
size
Write out results.
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void edm::StreamSchedule::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 864 of file StreamSchedule.cc.

References allWorkers(), end_paths_, edm::TriggerReport::endPathSummaries, edm::TriggerReport::eventSummary, edm::fillPathSummary(), edm::fillWorkerSummary(), edm::EventSummary::totalEvents, totalEvents(), edm::EventSummary::totalEventsFailed, totalEventsFailed(), edm::EventSummary::totalEventsPassed, totalEventsPassed(), trig_paths_, edm::TriggerReport::trigPathSummaries, and edm::TriggerReport::workerSummaries.

864  {
865  rep.eventSummary.totalEvents += totalEvents();
866  rep.eventSummary.totalEventsPassed += totalEventsPassed();
867  rep.eventSummary.totalEventsFailed += totalEventsFailed();
868 
869  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
870  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
871  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
872  }
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
int totalEventsFailed() const
int totalEvents() const
int totalEventsPassed() const
static void fillPathSummary(Path const &path, PathSummary &sum)
rep
Definition: cuy.py:1190
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
void edm::StreamSchedule::initializeEarlyDelete ( ModuleRegistry modReg,
edm::ParameterSet const &  opts,
edm::ProductRegistry const &  preg,
bool  allowEarlyDelete 
)
private

Definition at line 243 of file StreamSchedule.cc.

References allWorkers(), MicroEventContent_cff::branch, edm::BranchDescription::branchName(), edm::maker::ModuleHolder::createOutputModuleCommunicator(), delta, earlyDeleteBranchToCount_, earlyDeleteHelpers_, earlyDeleteHelperToBranchIndicies_, end_paths_, edm::ModuleRegistry::forAllModuleHolders(), runEdmFileComparison::found, edm::pset::Registry::getMapped(), edm::InEvent, edm::pset::Registry::instance(), checklumidiff::l, gen::n, AlCaHLTBitMon_ParallelJobs::p, muonDTDigis_cfi::pset, resetEarlyDelete(), AlCaHLTBitMon_QueryRunRegistry::string, trig_paths_, mitigatedMETSequence_cff::U, and w.

Referenced by StreamSchedule().

246  {
247  //for now, if have a subProcess, don't allow early delete
248  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
249  if (not allowEarlyDelete)
250  return;
251 
252  //see if 'canDeleteEarly' was set and if so setup the list with those products actually
253  // registered for this job
254  std::multimap<std::string, Worker*> branchToReadingWorker;
255  initializeBranchToReadingWorker(opts, preg, branchToReadingWorker);
256 
257  //If no delete early items have been specified we don't have to do anything
258  if (branchToReadingWorker.empty()) {
259  return;
260  }
261  const std::vector<std::string> kEmpty;
262  std::map<Worker*, unsigned int> reserveSizeForWorker;
263  unsigned int upperLimitOnReadingWorker = 0;
264  unsigned int upperLimitOnIndicies = 0;
265  unsigned int nUniqueBranchesToDelete = branchToReadingWorker.size();
266 
267  //talk with output modules first
268  modReg.forAllModuleHolders([&branchToReadingWorker, &nUniqueBranchesToDelete](maker::ModuleHolder* iHolder) {
269  auto comm = iHolder->createOutputModuleCommunicator();
270  if (comm) {
271  if (!branchToReadingWorker.empty()) {
272  //If an OutputModule needs a product, we can't delete it early
273  // so we should remove it from our list
274  SelectedProductsForBranchType const& kept = comm->keptProducts();
275  for (auto const& item : kept[InEvent]) {
276  BranchDescription const& desc = *item.first;
277  auto found = branchToReadingWorker.equal_range(desc.branchName());
278  if (found.first != found.second) {
279  --nUniqueBranchesToDelete;
280  branchToReadingWorker.erase(found.first, found.second);
281  }
282  }
283  }
284  }
285  });
286 
287  if (branchToReadingWorker.empty()) {
288  return;
289  }
290 
291  for (auto w : allWorkers()) {
292  //determine if this module could read a branch we want to delete early
293  auto pset = pset::Registry::instance()->getMapped(w->description().parameterSetID());
294  if (nullptr != pset) {
295  auto branches = pset->getUntrackedParameter<std::vector<std::string>>("mightGet", kEmpty);
296  if (not branches.empty()) {
297  ++upperLimitOnReadingWorker;
298  }
299  for (auto const& branch : branches) {
300  auto found = branchToReadingWorker.equal_range(branch);
301  if (found.first != found.second) {
302  ++upperLimitOnIndicies;
303  ++reserveSizeForWorker[w];
304  if (nullptr == found.first->second) {
305  found.first->second = w;
306  } else {
307  branchToReadingWorker.insert(make_pair(found.first->first, w));
308  }
309  }
310  }
311  }
312  }
313  {
314  auto it = branchToReadingWorker.begin();
315  std::vector<std::string> unusedBranches;
316  while (it != branchToReadingWorker.end()) {
317  if (it->second == nullptr) {
318  unusedBranches.push_back(it->first);
319  //erasing the object invalidates the iterator so must advance it first
320  auto temp = it;
321  ++it;
322  branchToReadingWorker.erase(temp);
323  } else {
324  ++it;
325  }
326  }
327  if (not unusedBranches.empty()) {
328  LogWarning l("UnusedProductsForCanDeleteEarly");
329  l << "The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
330  " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
331  for (auto const& n : unusedBranches) {
332  l << "\n " << n;
333  }
334  }
335  }
336  if (!branchToReadingWorker.empty()) {
337  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
338  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies, 0);
339  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
340  std::map<const Worker*, EarlyDeleteHelper*> alreadySeenWorkers;
341  std::string lastBranchName;
342  size_t nextOpenIndex = 0;
343  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
344  for (auto& branchAndWorker : branchToReadingWorker) {
345  if (lastBranchName != branchAndWorker.first) {
346  //have to put back the period we removed earlier in order to get the proper name
347  BranchID bid(branchAndWorker.first + ".");
348  earlyDeleteBranchToCount_.emplace_back(bid, 0U);
349  lastBranchName = branchAndWorker.first;
350  }
351  auto found = alreadySeenWorkers.find(branchAndWorker.second);
352  if (alreadySeenWorkers.end() == found) {
353  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
354  // all the branches that might be read by this worker. However, initially we will only tell the
355  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
356  // EarlyDeleteHelper will automatically advance its internal end pointer.
357  size_t index = nextOpenIndex;
358  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
360  earlyDeleteHelpers_.emplace_back(beginAddress + index, beginAddress + index + 1, &earlyDeleteBranchToCount_);
361  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
362  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second, &(earlyDeleteHelpers_.back())));
363  nextOpenIndex += nIndices;
364  } else {
365  found->second->appendIndex(earlyDeleteBranchToCount_.size() - 1);
366  }
367  }
368 
369  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
370  // space needed for each module
371  auto itLast = earlyDeleteHelpers_.begin();
372  for (auto it = earlyDeleteHelpers_.begin() + 1; it != earlyDeleteHelpers_.end(); ++it) {
373  if (itLast->end() != it->begin()) {
374  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
375  unsigned int delta = it->begin() - itLast->end();
376  it->shiftIndexPointers(delta);
377 
379  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
380  earlyDeleteHelperToBranchIndicies_.begin() + (it->begin() - beginAddress));
381  }
382  itLast = it;
383  }
385  earlyDeleteHelperToBranchIndicies_.begin() + (itLast->end() - beginAddress),
387 
388  //now tell the paths about the deleters
389  for (auto& p : trig_paths_) {
390  p.setEarlyDeleteHelpers(alreadySeenWorkers);
391  }
392  for (auto& p : end_paths_) {
393  p.setEarlyDeleteHelpers(alreadySeenWorkers);
394  }
396  }
397  }
dbl * delta
Definition: mlp_gen.cc:36
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
const double w
Definition: UKUtility.cc:23
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
std::vector< BranchToCount > earlyDeleteBranchToCount_
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:17
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
static Registry * instance()
Definition: Registry.cc:12
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void edm::StreamSchedule::makePathStatusInserters ( std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters,
std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &  endPathStatusInserters,
ExceptionToActionTable const &  actions 
)
private

Definition at line 903 of file StreamSchedule.cc.

References actions, actReg_, addToAllWorkers(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, edm::get_underlying(), pathStatusInserterWorkers_, and trig_paths_.

Referenced by StreamSchedule().

906  {
907  int bitpos = 0;
908  unsigned int indexEmpty = 0;
909  unsigned int indexOfPath = 0;
910  for (auto& pathStatusInserter : pathStatusInserters) {
911  std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
912  WorkerPtr workerPtr(
913  new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
914  pathStatusInserterWorkers_.emplace_back(workerPtr);
915  workerPtr->setActivityRegistry(actReg_);
916  addToAllWorkers(workerPtr.get());
917 
918  // A little complexity here because a C++ Path object is not
919  // instantiated and put into end_paths if there are no modules
920  // on the configured path.
921  if (indexEmpty < empty_trig_paths_.size() && bitpos == empty_trig_paths_.at(indexEmpty)) {
922  ++indexEmpty;
923  } else {
924  trig_paths_.at(indexOfPath).setPathStatusInserter(inserterPtr.get(), workerPtr.get());
925  ++indexOfPath;
926  }
927  ++bitpos;
928  }
929 
930  bitpos = 0;
931  indexEmpty = 0;
932  indexOfPath = 0;
933  for (auto& endPathStatusInserter : endPathStatusInserters) {
934  std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
935  WorkerPtr workerPtr(
936  new edm::WorkerT<EndPathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
937  endPathStatusInserterWorkers_.emplace_back(workerPtr);
938  workerPtr->setActivityRegistry(actReg_);
939  addToAllWorkers(workerPtr.get());
940 
941  // A little complexity here because a C++ Path object is not
942  // instantiated and put into end_paths if there are no modules
943  // on the configured path.
944  if (indexEmpty < empty_end_paths_.size() && bitpos == empty_end_paths_.at(indexEmpty)) {
945  ++indexEmpty;
946  } else {
947  end_paths_.at(indexOfPath).setPathStatusInserter(nullptr, workerPtr.get());
948  ++indexOfPath;
949  }
950  ++bitpos;
951  }
952  }
std::vector< int > empty_trig_paths_
roAction_t actions[nactions]
Definition: GenABIO.cc:181
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void addToAllWorkers(Worker *w)
std::shared_ptr< Worker > WorkerPtr
std::shared_ptr< ActivityRegistry > actReg_
std::vector< int > empty_end_paths_
T & get_underlying(propagate_const< T > &)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void edm::StreamSchedule::moduleDescriptionsInEndPath ( std::string const &  iEndPathLabel,
std::vector< ModuleDescription const * > &  descriptions,
unsigned int  hint 
) const

Definition at line 789 of file StreamSchedule.cc.

References end_paths_, runEdmFileComparison::found, mps_fire::i, and edm::Path::name().

791  {
792  descriptions.clear();
793  bool found = false;
794  TrigPaths::const_iterator itFound;
795 
796  if (hint < end_paths_.size()) {
797  itFound = end_paths_.begin() + hint;
798  if (itFound->name() == iEndPathLabel)
799  found = true;
800  }
801  if (!found) {
802  // if the hint did not work, do it the slow way
803  itFound = std::find_if(
804  end_paths_.begin(),
805  end_paths_.end(),
806  std::bind(std::equal_to<std::string>(), iEndPathLabel, std::bind(&Path::name, std::placeholders::_1)));
807  if (itFound != end_paths_.end())
808  found = true;
809  }
810  if (found) {
811  descriptions.reserve(itFound->size());
812  for (size_t i = 0; i < itFound->size(); ++i) {
813  descriptions.push_back(itFound->getWorker(i)->descPtr());
814  }
815  }
816  }
std::string const & name() const
Definition: Path.h:80
void edm::StreamSchedule::moduleDescriptionsInPath ( std::string const &  iPathLabel,
std::vector< ModuleDescription const * > &  descriptions,
unsigned int  hint 
) const

Definition at line 760 of file StreamSchedule.cc.

References runEdmFileComparison::found, mps_fire::i, edm::Path::name(), and trig_paths_.

762  {
763  descriptions.clear();
764  bool found = false;
765  TrigPaths::const_iterator itFound;
766 
767  if (hint < trig_paths_.size()) {
768  itFound = trig_paths_.begin() + hint;
769  if (itFound->name() == iPathLabel)
770  found = true;
771  }
772  if (!found) {
773  // if the hint did not work, do it the slow way
774  itFound = std::find_if(
775  trig_paths_.begin(),
776  trig_paths_.end(),
777  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
778  if (itFound != trig_paths_.end())
779  found = true;
780  }
781  if (found) {
782  descriptions.reserve(itFound->size());
783  for (size_t i = 0; i < itFound->size(); ++i) {
784  descriptions.push_back(itFound->getWorker(i)->descPtr());
785  }
786  }
787  }
std::string const & name() const
Definition: Path.h:80
void edm::StreamSchedule::modulesInPath ( std::string const &  iPathLabel,
std::vector< std::string > &  oLabelsToFill 
) const

adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel

Definition at line 747 of file StreamSchedule.cc.

References mps_fire::i, edm::Path::name(), and trig_paths_.

747  {
748  TrigPaths::const_iterator itFound = std::find_if(
749  trig_paths_.begin(),
750  trig_paths_.end(),
751  std::bind(std::equal_to<std::string>(), iPathLabel, std::bind(&Path::name, std::placeholders::_1)));
752  if (itFound != trig_paths_.end()) {
753  oLabelsToFill.reserve(itFound->size());
754  for (size_t i = 0; i < itFound->size(); ++i) {
755  oLabelsToFill.push_back(itFound->getWorker(i)->description().moduleLabel());
756  }
757  }
758  }
std::string const & name() const
Definition: Path.h:80
unsigned int edm::StreamSchedule::numberOfUnscheduledModules ( ) const
inline

Definition at line 257 of file StreamSchedule.h.

unsigned int number_of_unscheduled_modules_
void edm::StreamSchedule::processOneEventAsync ( WaitingTaskHolder  iTask,
EventPrincipal ep,
EventSetupImpl const &  es,
ServiceToken const &  token,
std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &  pathStatusInserters 
)

Definition at line 548 of file StreamSchedule.cc.

References actReg_, edm::WaitingTaskHolder::doneWaiting(), empty_end_paths_, empty_trig_paths_, end_paths_, endPathStatusInserterWorkers_, finishedPaths(), finishProcessOneEvent(), edm::make_waiting_task(), eostools::move(), cmsPerfStripChart::operate(), edm::hlt::Pass, pathStatusInserterWorkers_, edm::WorkerManager::processAccumulatorsAsync(), resetAll(), results_, edm::WorkerManager::setupOnDemandSystem(), streamContext_, streamID_, total_events_, trig_paths_, and workerManager_.

553  {
554  try {
555  this->resetAll();
556 
557  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
558 
559  Traits::setStreamContext(streamContext_, ep);
560  //a service may want to communicate with another service
561  ServiceRegistry::Operate guard(serviceToken);
562  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
563 
564  HLTPathStatus hltPathStatus(hlt::Pass, 0);
565  for (int empty_trig_path : empty_trig_paths_) {
566  results_->at(empty_trig_path) = hltPathStatus;
567  pathStatusInserters[empty_trig_path]->setPathStatus(streamID_, hltPathStatus);
568  std::exception_ptr except = pathStatusInserterWorkers_[empty_trig_path]
569  ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
570  ep, es, streamID_, ParentContext(&streamContext_), &streamContext_);
571  if (except) {
572  iTask.doneWaiting(except);
573  return;
574  }
575  }
576  for (int empty_end_path : empty_end_paths_) {
577  std::exception_ptr except = endPathStatusInserterWorkers_[empty_end_path]
578  ->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
579  ep, es, streamID_, ParentContext(&streamContext_), &streamContext_);
580  if (except) {
581  iTask.doneWaiting(except);
582  return;
583  }
584  }
585 
586  // This call takes care of the unscheduled processing.
588 
589  ++total_events_;
590 
591  //use to give priorities on an error to ones from Paths
592  auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
593  auto pathErrorPtr = pathErrorHolder.get();
594  auto allPathsDone = make_waiting_task(
595  tbb::task::allocate_root(),
596  [iTask, this, serviceToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
597  ServiceRegistry::Operate operate(serviceToken);
598 
599  std::exception_ptr ptr;
600  if (pathError->load()) {
601  ptr = *pathError->load();
602  delete pathError->load();
603  }
604  if ((not ptr) and iPtr) {
605  ptr = *iPtr;
606  }
607  iTask.doneWaiting(finishProcessOneEvent(ptr));
608  });
609  //The holder guarantees that if the paths finish before the loop ends
610  // that we do not start too soon. It also guarantees that the task will
611  // run under that condition.
612  WaitingTaskHolder allPathsHolder(allPathsDone);
613 
614  auto pathsDone = make_waiting_task(
615  tbb::task::allocate_root(),
616  [allPathsHolder, pathErrorPtr, &ep, &es, this, serviceToken](std::exception_ptr const* iPtr) mutable {
617  ServiceRegistry::Operate operate(serviceToken);
618 
619  if (iPtr) {
620  //this is used to prioritize this error over one
621  // that happens in EndPath or Accumulate
622  pathErrorPtr->store(new std::exception_ptr(*iPtr));
623  }
624  finishedPaths(*pathErrorPtr, std::move(allPathsHolder), ep, es);
625  });
626 
627  //The holder guarantees that if the paths finish before the loop ends
628  // that we do not start too soon. It also guarantees that the task will
629  // run under that condition.
630  WaitingTaskHolder taskHolder(pathsDone);
631 
632  //start end paths first so on single threaded the paths will run first
633  for (auto it = end_paths_.rbegin(), itEnd = end_paths_.rend(); it != itEnd; ++it) {
634  it->processOneOccurrenceAsync(allPathsDone, ep, es, serviceToken, streamID_, &streamContext_);
635  }
636 
637  for (auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend(); it != itEnd; ++it) {
638  it->processOneOccurrenceAsync(pathsDone, ep, es, serviceToken, streamID_, &streamContext_);
639  }
640 
641  ParentContext parentContext(&streamContext_);
642  workerManager_.processAccumulatorsAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
643  allPathsDone, ep, es, serviceToken, streamID_, parentContext, &streamContext_);
644  } catch (...) {
645  iTask.doneWaiting(std::current_exception());
646  }
647  }
std::vector< int > empty_trig_paths_
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
std::vector< int > empty_end_paths_
void setupOnDemandSystem(Principal &principal, EventSetupImpl const &es)
accept
Definition: HLTenums.h:19
StreamContext streamContext_
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
void processAccumulatorsAsync(WaitingTask *task, typename T::MyPrincipal const &ep, EventSetupImpl const &es, ServiceToken const &token, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
edm::propagate_const< TrigResPtr > results_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void finishedPaths(std::atomic< std::exception_ptr * > &, WaitingTaskHolder, EventPrincipal &ep, EventSetupImpl const &es)
def move(src, dest)
Definition: eostools.py:511
def operate(timelog, memlog, json_f, num)
template<typename T >
void edm::StreamSchedule::processOneStreamAsync ( WaitingTaskHolder  iTask,
typename T::MyPrincipal &  principal,
EventSetupImpl const &  eventSetup,
ServiceToken const &  token,
bool  cleaningUpAfterException = false 
)

Definition at line 380 of file StreamSchedule.h.

References edm::addContextAndPrintException(), cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), edm::ExceptionFromThisContext, h, triggerObjects_cff::id, edm::make_functor_task(), edm::make_waiting_task(), AlCaHLTBitMon_ParallelJobs::p, TrackValidation_cff::task, and edm::convertException::wrap().

384  {
385  T::setStreamContext(streamContext_, ep);
386 
387  auto id = ep.id();
388  auto doneTask = make_waiting_task(
389  tbb::task::allocate_root(),
390  [this, iHolder, id, cleaningUpAfterException, token](std::exception_ptr const* iPtr) mutable {
391  std::exception_ptr excpt;
392  if (iPtr) {
393  excpt = *iPtr;
394  //add context information to the exception and print message
395  try {
396  convertException::wrap([&]() { std::rethrow_exception(excpt); });
397  } catch (cms::Exception& ex) {
398  //TODO: should add the transition type info
399  std::ostringstream ost;
400  if (ex.context().empty()) {
401  ost << "Processing " << T::transitionName() << " " << id;
402  }
403  ServiceRegistry::Operate op(token);
404  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
405  excpt = std::current_exception();
406  }
407 
408  ServiceRegistry::Operate op(token);
409  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
410  }
411 
412  try {
413  ServiceRegistry::Operate op(token);
414  T::postScheduleSignal(actReg_.get(), &streamContext_);
415  } catch (...) {
416  if (not excpt) {
417  excpt = std::current_exception();
418  }
419  }
420  iHolder.doneWaiting(excpt);
421  });
422 
423  auto task = make_functor_task(tbb::task::allocate_root(),
424  [this, doneTask, h = WaitingTaskHolder(doneTask), &ep, &es, token]() mutable {
425  ServiceRegistry::Operate op(token);
426  try {
427  T::preScheduleSignal(actReg_.get(), &streamContext_);
428 
430  } catch (...) {
431  h.doneWaiting(std::current_exception());
432  return;
433  }
434 
435  for (auto& p : end_paths_) {
436  p.runAllModulesAsync<T>(doneTask, ep, es, token, streamID_, &streamContext_);
437  }
438 
439  for (auto& p : trig_paths_) {
440  p.runAllModulesAsync<T>(doneTask, ep, es, token, streamID_, &streamContext_);
441  }
442 
444  doneTask, ep, es, token, streamID_, &streamContext_, &streamContext_);
445  });
446 
447  if (streamID_.value() == 0) {
448  //Enqueueing will start another thread if there is only
449  // one thread in the job. Having stream == 0 use spawn
450  // avoids starting up another thread when there is only one stream.
451  tbb::task::spawn(*task);
452  } else {
453  tbb::task::enqueue(*task);
454  }
455  }
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
StreamContext streamContext_
std::list< std::string > const & context() const
Definition: Exception.cc:147
void processOneOccurrenceAsync(WaitingTask *task, typename T::MyPrincipal &principal, EventSetupImpl const &eventSetup, ServiceToken const &token, StreamID streamID, typename T::Context const *topContext, U const *context)
unsigned int value() const
Definition: StreamID.h:42
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
auto wrap(F iFunc) -> decltype(iFunc())
long double T
void edm::StreamSchedule::replaceModule ( maker::ModuleHolder iMod,
std::string const &  iLabel 
)

clone the type of module with label iLabel but configure with iPSet.

Definition at line 521 of file StreamSchedule.cc.

References allWorkers(), edm::Worker::beginStream(), runEdmFileComparison::found, edm::maker::ModuleHolder::replaceModuleFor(), streamContext_, and streamID_.

521  {
522  Worker* found = nullptr;
523  for (auto const& worker : allWorkers()) {
524  if (worker->description().moduleLabel() == iLabel) {
525  found = worker;
526  break;
527  }
528  }
529  if (nullptr == found) {
530  return;
531  }
532 
533  iMod->replaceModuleFor(found);
534  found->beginStream(streamID_, streamContext_);
535  }
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
StreamContext streamContext_
void edm::StreamSchedule::reportSkipped ( EventPrincipal const &  ep) const
inlineprivate

Definition at line 374 of file StreamSchedule.h.

References edm::EventID::event(), edm::EventPrincipal::id(), and edm::EventID::run().

374  {
375  Service<JobReport> reportSvc;
376  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
377  }
void edm::StreamSchedule::resetAll ( )
private

Definition at line 882 of file StreamSchedule.cc.

References results_, and skippingEvent_.

Referenced by processOneEventAsync().

882  {
883  skippingEvent_ = false;
884  results_->reset();
885  }
edm::propagate_const< TrigResPtr > results_
std::atomic< bool > skippingEvent_
void edm::StreamSchedule::resetEarlyDelete ( )
private

Definition at line 889 of file StreamSchedule.cc.

References KineDebug3::count(), earlyDeleteBranchToCount_, earlyDeleteHelpers_, and earlyDeleteHelperToBranchIndicies_.

Referenced by finishProcessOneEvent(), and initializeEarlyDelete().

889  {
890  //must be sure we have cleared the count first
891  for (auto& count : earlyDeleteBranchToCount_) {
892  count.count = 0;
893  }
894  //now reset based on how many helpers use that branch
896  ++(earlyDeleteBranchToCount_[index].count);
897  }
898  for (auto& helper : earlyDeleteHelpers_) {
899  helper.reset();
900  }
901  }
Definition: helper.py:1
std::vector< BranchToCount > earlyDeleteBranchToCount_
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
TrigResConstPtr edm::StreamSchedule::results ( ) const
inlineprivate

Definition at line 327 of file StreamSchedule.h.

References edm::get_underlying_safe().

Referenced by StreamSchedule().

327 { return get_underlying_safe(results_); }
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_
TrigResPtr& edm::StreamSchedule::results ( )
inlineprivate

Definition at line 328 of file StreamSchedule.h.

References actions, and edm::get_underlying_safe().

328 { return get_underlying_safe(results_); }
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_
StreamID edm::StreamSchedule::streamID ( ) const
inline

Definition at line 199 of file StreamSchedule.h.

References AlCaHLTBitMon_QueryRunRegistry::string.

199 { return streamID_; }
int edm::StreamSchedule::totalEvents ( ) const
inline

Return the number of events this StreamSchedule has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 226 of file StreamSchedule.h.

Referenced by getTriggerReport().

226 { return total_events_; }
int edm::StreamSchedule::totalEventsFailed ( ) const
inline

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 234 of file StreamSchedule.h.

References cuy::rep, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by getTriggerReport().

234 { return totalEvents() - totalEventsPassed(); }
int totalEvents() const
int totalEventsPassed() const
int edm::StreamSchedule::totalEventsPassed ( ) const
inline

Return the number of events which have been passed by one or more trigger paths.

Definition at line 230 of file StreamSchedule.h.

Referenced by getTriggerReport().

230 { return total_passed_; }

Member Data Documentation

std::shared_ptr<ActivityRegistry> edm::StreamSchedule::actReg_
private
std::vector<BranchToCount> edm::StreamSchedule::earlyDeleteBranchToCount_
private

Definition at line 352 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

std::vector<EarlyDeleteHelper> edm::StreamSchedule::earlyDeleteHelpers_
private

Definition at line 362 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

std::vector<unsigned int> edm::StreamSchedule::earlyDeleteHelperToBranchIndicies_
private

Definition at line 359 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

std::vector<int> edm::StreamSchedule::empty_end_paths_
private

Definition at line 347 of file StreamSchedule.h.

Referenced by fillEndPath(), makePathStatusInserters(), and processOneEventAsync().

std::vector<int> edm::StreamSchedule::empty_trig_paths_
private

Definition at line 346 of file StreamSchedule.h.

Referenced by fillTrigPath(), makePathStatusInserters(), and processOneEventAsync().

TrigPaths edm::StreamSchedule::end_paths_
private
volatile bool edm::StreamSchedule::endpathsAreActive_
private

Definition at line 370 of file StreamSchedule.h.

Referenced by enableEndPaths(), and endPathsEnabled().

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::endPathStatusInserterWorkers_
private

Definition at line 342 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

unsigned int edm::StreamSchedule::number_of_unscheduled_modules_
private

Definition at line 366 of file StreamSchedule.h.

Referenced by StreamSchedule().

std::vector<edm::propagate_const<WorkerPtr> > edm::StreamSchedule::pathStatusInserterWorkers_
private

Definition at line 341 of file StreamSchedule.h.

Referenced by makePathStatusInserters(), and processOneEventAsync().

edm::propagate_const<TrigResPtr> edm::StreamSchedule::results_
private

Definition at line 338 of file StreamSchedule.h.

Referenced by finishedPaths(), processOneEventAsync(), and resetAll().

edm::propagate_const<WorkerPtr> edm::StreamSchedule::results_inserter_
private

Definition at line 340 of file StreamSchedule.h.

Referenced by finishedPaths(), and StreamSchedule().

std::atomic<bool> edm::StreamSchedule::skippingEvent_
private

Definition at line 371 of file StreamSchedule.h.

Referenced by fillTrigPath(), and resetAll().

StreamContext edm::StreamSchedule::streamContext_
private
StreamID edm::StreamSchedule::streamID_
private
int edm::StreamSchedule::total_events_
private

Definition at line 364 of file StreamSchedule.h.

Referenced by clearCounters(), and processOneEventAsync().

int edm::StreamSchedule::total_passed_
private

Definition at line 365 of file StreamSchedule.h.

Referenced by clearCounters(), and finishedPaths().

TrigPaths edm::StreamSchedule::trig_paths_
private
WorkerManager edm::StreamSchedule::workerManager_
private