test
CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Classes | Public Types | Public Member Functions | Private Member Functions | Private Attributes
edm::StreamSchedule Class Reference

#include <StreamSchedule.h>

Classes

class  SendTerminationSignalIfException
 

Public Types

typedef std::vector< Worker * > AllWorkers
 
typedef std::vector< PathNonTrigPaths
 
typedef std::vector< WorkerInPathPathWorkers
 
typedef std::vector< PathTrigPaths
 
typedef std::shared_ptr
< HLTGlobalStatus const > 
TrigResConstPtr
 
typedef std::shared_ptr
< HLTGlobalStatus
TrigResPtr
 
typedef std::vector< std::string > vstring
 
typedef std::shared_ptr< WorkerWorkerPtr
 
typedef std::vector< Worker * > Workers
 

Public Member Functions

AllWorkers const & allWorkers () const
 returns the collection of pointers to workers More...
 
void availablePaths (std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill the labels for all paths in the process More...
 
void beginStream ()
 
void clearCounters ()
 Clear all the counters in the trigger report. More...
 
void enableEndPaths (bool active)
 
void endPaths (std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill the labels for all end paths in the process More...
 
bool endPathsEnabled () const
 
void endStream ()
 
std::vector< ModuleDescription
const * > 
getAllModuleDescriptions () const
 
void getTriggerReport (TriggerReport &rep) const
 
void moduleDescriptionsInEndPath (std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
 
void moduleDescriptionsInPath (std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
 
void modulesInPath (std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel More...
 
unsigned int numberOfUnscheduledModules () const
 
void processOneEvent (EventPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
 
template<typename T >
void processOneStream (typename T::MyPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
 
void replaceModule (maker::ModuleHolder *iMod, std::string const &iLabel)
 clone the type of module with label iLabel but configure with iPSet. More...
 
StreamID streamID () const
 
 StreamSchedule (std::shared_ptr< TriggerResultInserter > inserter, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
 
 StreamSchedule (StreamSchedule const &)=delete
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
void triggerPaths (std::vector< std::string > &oLabelsToFill) const
 

Private Member Functions

ExceptionToActionTable const & actionTable () const
 returns the action table More...
 
void addToAllWorkers (Worker *w)
 
void fillEndPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name)
 
void fillTrigPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, vstring *labelsOnTriggerPaths)
 
void fillWorkers (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, vstring *labelsOnPaths)
 
void initializeEarlyDelete (ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
 
void reportSkipped (EventPrincipal const &ep) const
 
void resetAll ()
 
void resetEarlyDelete ()
 
TrigResConstPtr results () const
 
TrigResPtrresults ()
 
template<typename T >
void runEndPaths (typename T::MyPrincipal const &, EventSetup const &, typename T::Context const *)
 
template<typename T >
bool runTriggerPaths (typename T::MyPrincipal const &, EventSetup const &, typename T::Context const *)
 

Private Attributes

std::shared_ptr< ActivityRegistryactReg_
 
std::vector< BranchToCountearlyDeleteBranchToCount_
 
std::vector< EarlyDeleteHelperearlyDeleteHelpers_
 
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
 
vstring empty_trig_path_names_
 
std::vector< int > empty_trig_paths_
 
vstring end_path_name_list_
 
TrigPaths end_paths_
 
volatile bool endpathsAreActive_
 
unsigned int number_of_unscheduled_modules_
 
edm::propagate_const< TrigResPtrresults_
 
edm::propagate_const< WorkerPtrresults_inserter_
 
std::atomic< bool > skippingEvent_
 
StreamContext streamContext_
 
StreamID streamID_
 
int total_events_
 
int total_passed_
 
vstring trig_name_list_
 
TrigPaths trig_paths_
 
WorkerManager workerManager_
 

Detailed Description

Definition at line 140 of file StreamSchedule.h.

Member Typedef Documentation

typedef std::vector<Worker*> edm::StreamSchedule::AllWorkers

Definition at line 148 of file StreamSchedule.h.

Definition at line 144 of file StreamSchedule.h.

Definition at line 152 of file StreamSchedule.h.

typedef std::vector<Path> edm::StreamSchedule::TrigPaths

Definition at line 143 of file StreamSchedule.h.

typedef std::shared_ptr<HLTGlobalStatus const> edm::StreamSchedule::TrigResConstPtr

Definition at line 146 of file StreamSchedule.h.

Definition at line 145 of file StreamSchedule.h.

typedef std::vector<std::string> edm::StreamSchedule::vstring

Definition at line 142 of file StreamSchedule.h.

typedef std::shared_ptr<Worker> edm::StreamSchedule::WorkerPtr

Definition at line 147 of file StreamSchedule.h.

typedef std::vector<Worker*> edm::StreamSchedule::Workers

Definition at line 150 of file StreamSchedule.h.

Constructor & Destructor Documentation

edm::StreamSchedule::StreamSchedule ( std::shared_ptr< TriggerResultInserter inserter,
std::shared_ptr< ModuleRegistry modReg,
ParameterSet proc_pset,
service::TriggerNamesService tns,
PreallocationConfiguration const &  prealloc,
ProductRegistry pregistry,
BranchIDListHelper branchIDListHelper,
ExceptionToActionTable const &  actions,
std::shared_ptr< ActivityRegistry areg,
std::shared_ptr< ProcessConfiguration processConfiguration,
bool  allowEarlyDelete,
StreamID  streamID,
ProcessContext const *  processContext 
)

Definition at line 132 of file StreamSchedule.cc.

References actReg_, addToAllWorkers(), edm::WorkerManager::addToUnscheduledWorkers(), allWorkers(), assert(), end_path_name_list_, end_paths_, fillEndPath(), fillTrigPath(), edm::propagate_const< T >::get(), edm::ParameterSet::getParameter(), edm::ParameterSet::getPSetForUpdate(), edm::ParameterSet::getUntrackedParameter(), edm::ParameterSet::getUntrackedParameterSet(), initializeEarlyDelete(), diffTwoXMLs::label, number_of_unscheduled_modules_, geometryDiff::opts, results(), results_inserter_, edm::WorkerManager::setOnDemandProducts(), trig_name_list_, trig_paths_, edm::StreamID::value(), and workerManager_.

144  :
145  workerManager_(modReg,areg, actions),
146  actReg_(areg),
147  trig_name_list_(tns.getTrigPaths()),
148  end_path_name_list_(tns.getEndPaths()),
149  results_(new HLTGlobalStatus(trig_name_list_.size())),
151  trig_paths_(),
152  end_paths_(),
153  total_events_(),
154  total_passed_(),
157  streamContext_(streamID_, processContext),
158  endpathsAreActive_(true),
159  skippingEvent_(false){
160 
161  ParameterSet const& opts = proc_pset.getUntrackedParameterSet("options", ParameterSet());
162  bool hasPath = false;
163 
164  int trig_bitpos = 0;
165  trig_paths_.reserve(trig_name_list_.size());
166  vstring labelsOnTriggerPaths;
167  for (auto const& trig_name : trig_name_list_) {
168  fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name, results(), &labelsOnTriggerPaths);
169  ++trig_bitpos;
170  hasPath = true;
171  }
172 
173  if (hasPath) {
174  // the results inserter stands alone
175  inserter->setTrigResultForStream(streamID.value(), results());
176 
177  results_inserter_ = makeInserter(actions, actReg_, inserter);
179  }
180 
181  // fill normal endpaths
182  int bitpos = 0;
183  end_paths_.reserve(end_path_name_list_.size());
184  for (auto const& end_path_name : end_path_name_list_) {
185  fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name);
186  ++bitpos;
187  }
188 
189  //See if all modules were used
190  std::set<std::string> usedWorkerLabels;
191  for (auto const& worker : allWorkers()) {
192  usedWorkerLabels.insert(worker->description().moduleLabel());
193  }
194  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string> >("@all_modules"));
195  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
196  std::vector<std::string> unusedLabels;
197  set_difference(modulesInConfigSet.begin(), modulesInConfigSet.end(),
198  usedWorkerLabels.begin(), usedWorkerLabels.end(),
199  back_inserter(unusedLabels));
200  //does the configuration say we should allow on demand?
201  bool allowUnscheduled = opts.getUntrackedParameter<bool>("allowUnscheduled", false);
202  std::set<std::string> unscheduledLabels;
203  std::vector<std::string> shouldBeUsedLabels;
204  if (!unusedLabels.empty()) {
205  //Need to
206  // 1) create worker
207  // 2) if it is a WorkerT<EDProducer>, add it to our list
208  // 3) hand list to our delayed reader
209  for (auto const& label : unusedLabels) {
210  if (allowUnscheduled) {
211  bool isTracked;
212  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
213  assert(isTracked);
214  assert(modulePSet != nullptr);
215  workerManager_.addToUnscheduledWorkers(*modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
216  } else {
217  //everthing is marked are unused so no 'on demand' allowed
218  shouldBeUsedLabels.push_back(label);
219  }
220  }
221  if (!shouldBeUsedLabels.empty()) {
222  std::ostringstream unusedStream;
223  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
224  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
225  itLabelEnd = shouldBeUsedLabels.end();
226  itLabel != itLabelEnd;
227  ++itLabel) {
228  unusedStream << ",'" << *itLabel << "'";
229  }
230  LogInfo("path")
231  << "The following module labels are not assigned to any path:\n"
232  << unusedStream.str()
233  << "\n";
234  }
235  }
236  if (!unscheduledLabels.empty()) {
237  number_of_unscheduled_modules_=unscheduledLabels.size();
238  workerManager_.setOnDemandProducts(preg, unscheduledLabels);
239  }
240 
241 
242  initializeEarlyDelete(*modReg, opts,preg,allowEarlyDelete);
243 
244  } // StreamSchedule::StreamSchedule
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name)
vector< string > vstring
Definition: ExoticaDQM.cc:8
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
void setOnDemandProducts(ProductRegistry &pregistry, std::set< std::string > const &unscheduledLabels) const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
volatile bool endpathsAreActive_
assert(m_qm.get())
processConfiguration
Definition: Schedule.cc:383
void addToAllWorkers(Worker *w)
edm::propagate_const< WorkerPtr > results_inserter_
actions
Definition: Schedule.cc:383
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
StreamID streamID() const
StreamContext streamContext_
element_type const * get() const
areg
Definition: Schedule.cc:383
unsigned int value() const
Definition: StreamID.h:46
edm::propagate_const< TrigResPtr > results_
std::atomic< bool > skippingEvent_
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, vstring *labelsOnTriggerPaths)
preg
Definition: Schedule.cc:383
TrigResConstPtr results() const
prealloc
Definition: Schedule.cc:383
edm::StreamSchedule::StreamSchedule ( StreamSchedule const &  )
delete

Member Function Documentation

ExceptionToActionTable const& edm::StreamSchedule::actionTable ( ) const
inlineprivate

returns the action table

Definition at line 285 of file StreamSchedule.h.

References edm::WorkerManager::actionTable(), and workerManager_.

Referenced by fillEndPath(), fillTrigPath(), and processOneEvent().

285  {
286  return workerManager_.actionTable();
287  }
WorkerManager workerManager_
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:68
void edm::StreamSchedule::addToAllWorkers ( Worker w)
private

Definition at line 825 of file StreamSchedule.cc.

References edm::WorkerManager::addToAllWorkers(), and workerManager_.

Referenced by fillEndPath(), fillTrigPath(), and StreamSchedule().

825  {
827  }
const double w
Definition: UKUtility.cc:23
WorkerManager workerManager_
void addToAllWorkers(Worker *w)
AllWorkers const& edm::StreamSchedule::allWorkers ( ) const
inline

returns the collection of pointers to workers

Definition at line 253 of file StreamSchedule.h.

References edm::WorkerManager::allWorkers(), and workerManager_.

Referenced by clearCounters(), getAllModuleDescriptions(), getTriggerReport(), initializeEarlyDelete(), replaceModule(), and StreamSchedule().

253  {
254  return workerManager_.allWorkers();
255  }
WorkerManager workerManager_
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:64
void edm::StreamSchedule::availablePaths ( std::vector< std::string > &  oLabelsToFill) const

adds to oLabelsToFill the labels for all paths in the process

Definition at line 644 of file StreamSchedule.cc.

References edm::Path::name(), create_public_lumi_plots::transform, and trig_paths_.

644  {
645  oLabelsToFill.reserve(trig_paths_.size());
646  std::transform(trig_paths_.begin(),
647  trig_paths_.end(),
648  std::back_inserter(oLabelsToFill),
649  std::bind(&Path::name, std::placeholders::_1));
650  }
std::string const & name() const
Definition: Path.h:68
void edm::StreamSchedule::beginStream ( )

Definition at line 509 of file StreamSchedule.cc.

References edm::WorkerManager::beginStream(), streamContext_, streamID_, and workerManager_.

509  {
511  }
WorkerManager workerManager_
StreamContext streamContext_
void beginStream(StreamID iID, StreamContext &streamContext)
void edm::StreamSchedule::clearCounters ( )

Clear all the counters in the trigger report.

Definition at line 810 of file StreamSchedule.cc.

References allWorkers(), edm::Path::clearCounters(), edm::Worker::clearCounters(), end_paths_, edm::for_all(), total_events_, total_passed_, and trig_paths_.

810  {
811  using std::placeholders::_1;
813  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
814  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
815  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
816  }
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void clearCounters()
Definition: Worker.h:145
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void clearCounters()
Definition: Path.cc:167
void edm::StreamSchedule::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 738 of file StreamSchedule.cc.

References endpathsAreActive_.

738  {
739  endpathsAreActive_ = active;
740  }
volatile bool endpathsAreActive_
void edm::StreamSchedule::endPaths ( std::vector< std::string > &  oLabelsToFill) const

adds to oLabelsToFill the labels for all end paths in the process

Definition at line 658 of file StreamSchedule.cc.

References end_path_name_list_.

658  {
659  oLabelsToFill = end_path_name_list_;
660  }
bool edm::StreamSchedule::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 743 of file StreamSchedule.cc.

References endpathsAreActive_.

743  {
744  return endpathsAreActive_;
745  }
volatile bool endpathsAreActive_
void edm::StreamSchedule::endStream ( )

Definition at line 513 of file StreamSchedule.cc.

References edm::WorkerManager::endStream(), streamContext_, streamID_, and workerManager_.

513  {
515  }
void endStream(StreamID iID, StreamContext &streamContext)
WorkerManager workerManager_
StreamContext streamContext_
void edm::StreamSchedule::fillEndPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name 
)
private

Definition at line 488 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), end_paths_, fillWorkers(), edm::for_all(), edm::PathContext::kEndPath, and streamContext_.

Referenced by StreamSchedule().

492  {
493  using std::placeholders::_1;
494  PathWorkers tmpworkers;
495  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, 0);
496  Workers holder;
497 
498  for (PathWorkers::iterator wi(tmpworkers.begin()), we(tmpworkers.end()); wi != we; ++wi) {
499  holder.push_back(wi->getWorker());
500  }
501 
502  if (!tmpworkers.empty()) {
503  //EndPaths are not supposed to stop if SkipEvent type exception happens
504  end_paths_.emplace_back(bitpos, name, tmpworkers, TrigResPtr(), actionTable(), actReg_, &streamContext_, nullptr, PathContext::PathType::kEndPath);
505  }
506  for_all(holder, std::bind(&StreamSchedule::addToAllWorkers, this, _1));
507  }
std::vector< Worker * > Workers
std::shared_ptr< HLTGlobalStatus > TrigResPtr
processConfiguration
Definition: Schedule.cc:383
void addToAllWorkers(Worker *w)
ExceptionToActionTable const & actionTable() const
returns the action table
std::shared_ptr< ActivityRegistry > actReg_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
std::vector< WorkerInPath > PathWorkers
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, vstring *labelsOnPaths)
StreamContext streamContext_
preg
Definition: Schedule.cc:383
prealloc
Definition: Schedule.cc:383
void edm::StreamSchedule::fillTrigPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
TrigResPtr  trptr,
vstring labelsOnTriggerPaths 
)
private

Definition at line 462 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_trig_path_names_, empty_trig_paths_, fillWorkers(), edm::for_all(), edm::PathContext::kPath, skippingEvent_, streamContext_, and trig_paths_.

Referenced by StreamSchedule().

467  {
468  using std::placeholders::_1;
469  PathWorkers tmpworkers;
470  Workers holder;
471  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, labelsOnTriggerPaths);
472 
473  for (PathWorkers::iterator wi(tmpworkers.begin()),
474  we(tmpworkers.end()); wi != we; ++wi) {
475  holder.push_back(wi->getWorker());
476  }
477 
478  // an empty path will cause an extra bit that is not used
479  if (!tmpworkers.empty()) {
480  trig_paths_.emplace_back(bitpos, name, tmpworkers, trptr, actionTable(), actReg_, &streamContext_, &skippingEvent_, PathContext::PathType::kPath);
481  } else {
482  empty_trig_paths_.push_back(bitpos);
483  empty_trig_path_names_.push_back(name);
484  }
485  for_all(holder, std::bind(&StreamSchedule::addToAllWorkers, this, _1));
486  }
std::vector< Worker * > Workers
std::vector< int > empty_trig_paths_
processConfiguration
Definition: Schedule.cc:383
void addToAllWorkers(Worker *w)
ExceptionToActionTable const & actionTable() const
returns the action table
std::shared_ptr< ActivityRegistry > actReg_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
std::vector< WorkerInPath > PathWorkers
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, vstring *labelsOnPaths)
StreamContext streamContext_
vstring empty_trig_path_names_
std::atomic< bool > skippingEvent_
preg
Definition: Schedule.cc:383
prealloc
Definition: Schedule.cc:383
void edm::StreamSchedule::fillWorkers ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
std::string const &  name,
bool  ignoreFilters,
PathWorkers out,
vstring labelsOnPaths 
)
private

Definition at line 403 of file StreamSchedule.cc.

References assert(), edm::errors::Configuration, edm::Worker::description(), end_path_name_list_, Exception, edm::ParameterSet::getParameter(), edm::ParameterSet::getPSetForUpdate(), edm::ParameterSet::getUntrackedParameter(), edm::WorkerManager::getWorker(), edm::WorkerInPath::Ignore, edm::Worker::kFilter, edm::ModuleDescription::moduleName(), edm::Worker::moduleType(), mergeVDriftHistosByStation::name, edm::WorkerInPath::Normal, EgammaValidation_cff::pathName, edm::search_all(), AlCaHLTBitMon_QueryRunRegistry::string, edm::WorkerInPath::Veto, and workerManager_.

Referenced by fillEndPath(), and fillTrigPath().

410  {
411  vstring modnames = proc_pset.getParameter<vstring>(pathName);
412  PathWorkers tmpworkers;
413 
414  unsigned int placeInPath = 0;
415  for (auto const& name : modnames) {
416 
417  if (labelsOnPaths) labelsOnPaths->push_back(name);
418 
420  if (name[0] == '!') filterAction = WorkerInPath::Veto;
421  else if (name[0] == '-') filterAction = WorkerInPath::Ignore;
422 
423  std::string moduleLabel = name;
424  if (filterAction != WorkerInPath::Normal) moduleLabel.erase(0, 1);
425 
426  bool isTracked;
427  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
428  if (modpset == 0) {
429  std::string pathType("endpath");
431  pathType = std::string("path");
432  }
434  "The unknown module label \"" << moduleLabel <<
435  "\" appears in " << pathType << " \"" << pathName <<
436  "\"\n please check spelling or remove that label from the path.";
437  }
438  assert(isTracked);
439 
440  Worker* worker = workerManager_.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
441  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType()==Worker::kFilter) {
442  // We have a filter on an end path, and the filter is not explicitly ignored.
443  // See if the filter is allowed.
444  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
445  if (!search_all(allowed_filters, worker->description().moduleName())) {
446  // Filter is not allowed. Ignore the result, and issue a warning.
447  filterAction = WorkerInPath::Ignore;
448  LogWarning("FilterOnEndPath")
449  << "The EDFilter '" << worker->description().moduleName() << "' with module label '" << moduleLabel << "' appears on EndPath '" << pathName << "'.\n"
450  << "The return value of the filter will be ignored.\n"
451  << "To suppress this warning, either remove the filter from the endpath,\n"
452  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
453  }
454  }
455  tmpworkers.emplace_back(worker, filterAction, placeInPath);
456  ++placeInPath;
457  }
458 
459  out.swap(tmpworkers);
460  }
vector< string > vstring
Definition: ExoticaDQM.cc:8
assert(m_qm.get())
processConfiguration
Definition: Schedule.cc:383
WorkerManager workerManager_
std::vector< WorkerInPath > PathWorkers
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:46
preg
Definition: Schedule.cc:383
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
prealloc
Definition: Schedule.cc:383
std::vector< ModuleDescription const * > edm::StreamSchedule::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this StreamSchedule. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 535 of file StreamSchedule.cc.

References allWorkers(), AlCaHLTBitMon_ParallelJobs::p, mps_fire::result, and findQualityFiles::size.

535  {
536  std::vector<ModuleDescription const*> result;
537  result.reserve(allWorkers().size());
538 
539  for (auto const& worker : allWorkers()) {
540  ModuleDescription const* p = worker->descPtr();
541  result.push_back(p);
542  }
543  return result;
544  }
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
tuple result
Definition: mps_fire.py:84
tuple size
Write out results.
void edm::StreamSchedule::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 798 of file StreamSchedule.cc.

References allWorkers(), end_paths_, edm::TriggerReport::endPathSummaries, edm::TriggerReport::eventSummary, edm::fillPathSummary(), edm::fillWorkerSummary(), edm::sort_all(), edm::EventSummary::totalEvents, totalEvents(), edm::EventSummary::totalEventsFailed, totalEventsFailed(), edm::EventSummary::totalEventsPassed, totalEventsPassed(), trig_paths_, edm::TriggerReport::trigPathSummaries, and edm::TriggerReport::workerSummaries.

798  {
799  rep.eventSummary.totalEvents += totalEvents();
800  rep.eventSummary.totalEventsPassed += totalEventsPassed();
801  rep.eventSummary.totalEventsFailed += totalEventsFailed();
802 
803  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
804  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
805  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
806  sort_all(rep.workerSummaries);
807  }
string rep
Definition: cuy.py:1188
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
int totalEventsFailed() const
int totalEvents() const
int totalEventsPassed() const
static void fillPathSummary(Path const &path, PathSummary &sum)
void sort_all(RandomAccessSequence &s)
wrappers for std::sort
Definition: Algorithms.h:120
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
void edm::StreamSchedule::initializeEarlyDelete ( ModuleRegistry modReg,
edm::ParameterSet const &  opts,
edm::ProductRegistry const &  preg,
bool  allowEarlyDelete 
)
private

Definition at line 247 of file StreamSchedule.cc.

References allWorkers(), edm::BranchDescription::branchName(), edm::maker::ModuleHolder::createOutputModuleCommunicator(), delta, earlyDeleteBranchToCount_, earlyDeleteHelpers_, earlyDeleteHelperToBranchIndicies_, end_paths_, edm::ModuleRegistry::forAllModuleHolders(), newFWLiteAna::found, edm::pset::Registry::getMapped(), cmsHarvester::index, edm::InEvent, edm::pset::Registry::instance(), cmsLHEtoEOSManager::l, gen::n, AlCaHLTBitMon_ParallelJobs::p, TrackValidation_cff::pset, resetEarlyDelete(), AlCaHLTBitMon_QueryRunRegistry::string, trig_paths_, and w.

Referenced by StreamSchedule().

249  {
250  //for now, if have a subProcess, don't allow early delete
251  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
252  if(not allowEarlyDelete) return;
253 
254  //see if 'canDeleteEarly' was set and if so setup the list with those products actually
255  // registered for this job
256  std::multimap<std::string,Worker*> branchToReadingWorker;
257  initializeBranchToReadingWorker(opts,preg,branchToReadingWorker);
258 
259  //If no delete early items have been specified we don't have to do anything
260  if(branchToReadingWorker.size()==0) {
261  return;
262  }
263  const std::vector<std::string> kEmpty;
264  std::map<Worker*,unsigned int> reserveSizeForWorker;
265  unsigned int upperLimitOnReadingWorker =0;
266  unsigned int upperLimitOnIndicies = 0;
267  unsigned int nUniqueBranchesToDelete=branchToReadingWorker.size();
268 
269  //talk with output modules first
270  modReg.forAllModuleHolders([this, &branchToReadingWorker,&nUniqueBranchesToDelete](maker::ModuleHolder* iHolder){
271  auto comm = iHolder->createOutputModuleCommunicator();
272  if (comm) {
273  if(branchToReadingWorker.size()>0) {
274  //If an OutputModule needs a product, we can't delete it early
275  // so we should remove it from our list
276  SelectedProductsForBranchType const& kept = comm->keptProducts();
277  for(auto const& item: kept[InEvent]) {
278  BranchDescription const& desc = *item.first;
279  auto found = branchToReadingWorker.equal_range(desc.branchName());
280  if(found.first !=found.second) {
281  --nUniqueBranchesToDelete;
282  branchToReadingWorker.erase(found.first,found.second);
283  }
284  }
285  }
286  }
287  });
288 
289  if(branchToReadingWorker.size()==0) {
290  return;
291  }
292 
293  for (auto w :allWorkers()) {
294  //determine if this module could read a branch we want to delete early
295  auto pset = pset::Registry::instance()->getMapped(w->description().parameterSetID());
296  if(0!=pset) {
297  auto branches = pset->getUntrackedParameter<std::vector<std::string>>("mightGet",kEmpty);
298  if(not branches.empty()) {
299  ++upperLimitOnReadingWorker;
300  }
301  for(auto const& branch:branches){
302  auto found = branchToReadingWorker.equal_range(branch);
303  if(found.first != found.second) {
304  ++upperLimitOnIndicies;
305  ++reserveSizeForWorker[w];
306  if(nullptr == found.first->second) {
307  found.first->second = w;
308  } else {
309  branchToReadingWorker.insert(make_pair(found.first->first,w));
310  }
311  }
312  }
313  }
314  }
315  {
316  auto it = branchToReadingWorker.begin();
317  std::vector<std::string> unusedBranches;
318  while(it !=branchToReadingWorker.end()) {
319  if(it->second == nullptr) {
320  unusedBranches.push_back(it->first);
321  //erasing the object invalidates the iterator so must advance it first
322  auto temp = it;
323  ++it;
324  branchToReadingWorker.erase(temp);
325  } else {
326  ++it;
327  }
328  }
329  if(not unusedBranches.empty()) {
330  LogWarning l("UnusedProductsForCanDeleteEarly");
331  l<<"The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
332  " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
333  for(auto const& n:unusedBranches){
334  l<<"\n "<<n;
335  }
336  }
337  }
338  if(0!=branchToReadingWorker.size()) {
339  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
340  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies,0);
341  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
342  std::map<const Worker*,EarlyDeleteHelper*> alreadySeenWorkers;
343  std::string lastBranchName;
344  size_t nextOpenIndex = 0;
345  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
346  for(auto& branchAndWorker:branchToReadingWorker) {
347  if(lastBranchName != branchAndWorker.first) {
348  //have to put back the period we removed earlier in order to get the proper name
349  BranchID bid(branchAndWorker.first+".");
350  earlyDeleteBranchToCount_.emplace_back(bid,0U);
351  lastBranchName = branchAndWorker.first;
352  }
353  auto found = alreadySeenWorkers.find(branchAndWorker.second);
354  if(alreadySeenWorkers.end() == found) {
355  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
356  // all the branches that might be read by this worker. However, initially we will only tell the
357  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
358  // EarlyDeleteHelper will automatically advance its internal end pointer.
359  size_t index = nextOpenIndex;
360  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
362  earlyDeleteHelpers_.emplace_back(beginAddress+index,
363  beginAddress+index+1,
365  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
366  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second,&(earlyDeleteHelpers_.back())));
367  nextOpenIndex +=nIndices;
368  } else {
369  found->second->appendIndex(earlyDeleteBranchToCount_.size()-1);
370  }
371  }
372 
373  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
374  // space needed for each module
375  auto itLast = earlyDeleteHelpers_.begin();
376  for(auto it = earlyDeleteHelpers_.begin()+1;it != earlyDeleteHelpers_.end();++it) {
377  if(itLast->end() != it->begin()) {
378  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
379  unsigned int delta = it->begin()- itLast->end();
380  it->shiftIndexPointers(delta);
381 
383  (itLast->end()-beginAddress),
385  (it->begin()-beginAddress));
386  }
387  itLast = it;
388  }
389  earlyDeleteHelperToBranchIndicies_.erase(earlyDeleteHelperToBranchIndicies_.begin()+(itLast->end()-beginAddress),
391 
392  //now tell the paths about the deleters
393  for(auto& p : trig_paths_) {
394  p.setEarlyDeleteHelpers(alreadySeenWorkers);
395  }
396  for(auto& p : end_paths_) {
397  p.setEarlyDeleteHelpers(alreadySeenWorkers);
398  }
400  }
401  }
dbl * delta
Definition: mlp_gen.cc:36
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
const double w
Definition: UKUtility.cc:23
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
std::vector< BranchToCount > earlyDeleteBranchToCount_
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:18
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
preg
Definition: Schedule.cc:383
static Registry * instance()
Definition: Registry.cc:12
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void edm::StreamSchedule::moduleDescriptionsInEndPath ( std::string const &  iEndPathLabel,
std::vector< ModuleDescription const * > &  descriptions,
unsigned int  hint 
) const

Definition at line 709 of file StreamSchedule.cc.

References end_paths_, newFWLiteAna::found, i, and edm::Path::name().

711  {
712  descriptions.clear();
713  bool found = false;
714  TrigPaths::const_iterator itFound;
715 
716  if(hint < end_paths_.size()) {
717  itFound = end_paths_.begin() + hint;
718  if(itFound->name() == iEndPathLabel) found = true;
719  }
720  if(!found) {
721  // if the hint did not work, do it the slow way
722  itFound = std::find_if (end_paths_.begin(),
723  end_paths_.end(),
724  std::bind(std::equal_to<std::string>(),
725  iEndPathLabel,
726  std::bind(&Path::name, std::placeholders::_1)));
727  if (itFound != end_paths_.end()) found = true;
728  }
729  if (found) {
730  descriptions.reserve(itFound->size());
731  for (size_t i = 0; i < itFound->size(); ++i) {
732  descriptions.push_back(itFound->getWorker(i)->descPtr());
733  }
734  }
735  }
int i
Definition: DBlmapReader.cc:9
std::string const & name() const
Definition: Path.h:68
void edm::StreamSchedule::moduleDescriptionsInPath ( std::string const &  iPathLabel,
std::vector< ModuleDescription const * > &  descriptions,
unsigned int  hint 
) const

Definition at line 680 of file StreamSchedule.cc.

References newFWLiteAna::found, i, edm::Path::name(), and trig_paths_.

682  {
683  descriptions.clear();
684  bool found = false;
685  TrigPaths::const_iterator itFound;
686 
687  if(hint < trig_paths_.size()) {
688  itFound = trig_paths_.begin() + hint;
689  if(itFound->name() == iPathLabel) found = true;
690  }
691  if(!found) {
692  // if the hint did not work, do it the slow way
693  itFound = std::find_if (trig_paths_.begin(),
694  trig_paths_.end(),
695  std::bind(std::equal_to<std::string>(),
696  iPathLabel,
697  std::bind(&Path::name, std::placeholders::_1)));
698  if (itFound != trig_paths_.end()) found = true;
699  }
700  if (found) {
701  descriptions.reserve(itFound->size());
702  for (size_t i = 0; i < itFound->size(); ++i) {
703  descriptions.push_back(itFound->getWorker(i)->descPtr());
704  }
705  }
706  }
int i
Definition: DBlmapReader.cc:9
std::string const & name() const
Definition: Path.h:68
void edm::StreamSchedule::modulesInPath ( std::string const &  iPathLabel,
std::vector< std::string > &  oLabelsToFill 
) const

adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel

Definition at line 663 of file StreamSchedule.cc.

References i, edm::Path::name(), and trig_paths_.

664  {
665  TrigPaths::const_iterator itFound =
666  std::find_if (trig_paths_.begin(),
667  trig_paths_.end(),
668  std::bind(std::equal_to<std::string>(),
669  iPathLabel,
670  std::bind(&Path::name, std::placeholders::_1)));
671  if (itFound!=trig_paths_.end()) {
672  oLabelsToFill.reserve(itFound->size());
673  for (size_t i = 0; i < itFound->size(); ++i) {
674  oLabelsToFill.push_back(itFound->getWorker(i)->description().moduleLabel());
675  }
676  }
677  }
int i
Definition: DBlmapReader.cc:9
std::string const & name() const
Definition: Path.h:68
unsigned int edm::StreamSchedule::numberOfUnscheduledModules ( ) const
inline

Definition at line 257 of file StreamSchedule.h.

References number_of_unscheduled_modules_.

257  {
259  }
unsigned int number_of_unscheduled_modules_
void edm::StreamSchedule::processOneEvent ( EventPrincipal principal,
EventSetup const &  eventSetup,
bool  cleaningUpAfterException = false 
)

Definition at line 546 of file StreamSchedule.cc.

References mps_fire::action, actionTable(), actReg_, cms::Exception::addContext(), edm::addContextAndPrintException(), assert(), cms::Exception::category(), cms::Exception::context(), alignCSCRings::e, empty_trig_paths_, end_paths_, endpathsAreActive_, edm::exception_actions::FailPath, edm::ExceptionToActionTable::find(), edm::propagate_const< T >::get(), edm::exception_actions::IgnoreCompletely, edm::make_empty_waiting_task(), edm::hlt::Pass, edm::printCmsExceptionWarning(), edm::WorkerManager::processOneOccurrence(), resetAll(), resetEarlyDelete(), results_, results_inserter_, edm::exception_actions::SkipEvent, streamContext_, streamID_, total_events_, total_passed_, trig_paths_, workerManager_, and edm::convertException::wrap().

548  {
549  this->resetAll();
550  for (int empty_trig_path : empty_trig_paths_) {
551  results_->at(empty_trig_path) = HLTPathStatus(hlt::Pass, 0);
552  }
553 
554  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
555 
556  Traits::setStreamContext(streamContext_, ep);
557  StreamScheduleSignalSentry<Traits> sentry(actReg_.get(), &streamContext_);
558 
559  SendTerminationSignalIfException terminationSentry(actReg_.get(), &streamContext_);
560  // This call takes care of the unscheduled processing.
561  workerManager_.processOneOccurrence<Traits>(ep, es, streamID_, &streamContext_, &streamContext_, cleaningUpAfterException);
562 
563  ++total_events_;
564  try {
565  convertException::wrap([&]() {
566 
567  try {
568  auto waitTask = edm::make_empty_waiting_task();
569  //set count to 2 since wait_for_all requires value to not go to 0
570  waitTask->set_ref_count(2);
571  //Begin process in reverse order since TBB is last in first out for tasks.
572  for(auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend();
573  it != itEnd; ++ it) {
574  it->processOneOccurrenceAsync(waitTask.get(),ep, es, streamID_, &streamContext_);
575  }
576  waitTask->decrement_ref_count();
577  waitTask->wait_for_all();
578 
579  if(waitTask->exceptionPtr() != nullptr) {
580  std::rethrow_exception(*(waitTask->exceptionPtr()));
581  }
582  if(results_->accept()) {
583  ++total_passed_;
584  }
585 
586  }
587  catch(cms::Exception& e) {
591  if (action == exception_actions::SkipEvent) {
592  edm::printCmsExceptionWarning("SkipEvent", e);
593  } else {
594  throw;
595  }
596  }
597 
598  try {
599  ParentContext parentContext(&streamContext_);
600  if (results_inserter_.get()) results_inserter_->doWork<Traits>(ep, es, streamID_, parentContext, &streamContext_);
601  }
602  catch (cms::Exception & ex) {
603  ex.addContext("Calling produce method for module TriggerResultInserter");
604  std::ostringstream ost;
605  ost << "Processing " << ep.id();
606  ex.addContext(ost.str());
607  throw;
608  }
609 
610  if (endpathsAreActive_) {
611  auto waitTask = edm::make_empty_waiting_task();
612  //set count to 2 since wait_for_all requires value to not go to 0
613  waitTask->set_ref_count(2);
614  for(auto it = end_paths_.rbegin(), itEnd = end_paths_.rend();
615  it != itEnd; ++it) {
616  it->processOneOccurrenceAsync(waitTask.get(),ep, es, streamID_, &streamContext_);
617  }
618  waitTask->decrement_ref_count();
619  waitTask->wait_for_all();
620 
621  if(waitTask->exceptionPtr() != nullptr) {
622  std::rethrow_exception(*(waitTask->exceptionPtr()));
623  }
624 
625  }
627  });
628  }
629  catch(cms::Exception& ex) {
630  if (ex.context().empty()) {
631  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
632  } else {
633  addContextAndPrintException("", ex, cleaningUpAfterException);
634  }
635  throw;
636  }
637  terminationSentry.completedSuccessfully();
638 
639  //If we got here no other exception has happened so we can propogate any Service related exceptions
640  sentry.allowThrow();
641  }
std::vector< int > empty_trig_paths_
void processOneOccurrence(typename T::MyPrincipal &principal, EventSetup const &eventSetup, StreamID streamID, typename T::Context const *topContext, U const *context, bool cleaningUpAfterException=false)
Definition: WorkerManager.h:90
volatile bool endpathsAreActive_
assert(m_qm.get())
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::string const & category() const
Definition: Exception.cc:183
exception_actions::ActionCodes find(const std::string &category) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
accept
Definition: HLTenums.h:19
std::unique_ptr< edm::EmptyWaitingTask, waitingtask::TaskDestroyer > make_empty_waiting_task()
Create an EmptyWaitingTask which will properly be destroyed.
StreamContext streamContext_
std::list< std::string > const & context() const
Definition: Exception.cc:191
element_type const * get() const
edm::propagate_const< TrigResPtr > results_
void addContext(std::string const &context)
Definition: Exception.cc:227
string action
Definition: mps_fire.py:28
auto wrap(F iFunc) -> decltype(iFunc())
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
template<typename T >
void edm::StreamSchedule::processOneStream ( typename T::MyPrincipal &  principal,
EventSetup const &  eventSetup,
bool  cleaningUpAfterException = false 
)

Definition at line 376 of file StreamSchedule.h.

References actReg_, edm::addContextAndPrintException(), cms::Exception::context(), endpathsAreActive_, edm::WorkerManager::processOneOccurrence(), resetAll(), streamContext_, streamID_, workerManager_, and edm::convertException::wrap().

378  {
379  this->resetAll();
380 
381  T::setStreamContext(streamContext_, ep);
382  StreamScheduleSignalSentry<T> sentry(actReg_.get(), &streamContext_);
383 
384  SendTerminationSignalIfException terminationSentry(actReg_.get(), &streamContext_);
385 
386  // This call takes care of the unscheduled processing.
387  workerManager_.processOneOccurrence<T>(ep, es, streamID_, &streamContext_, &streamContext_, cleaningUpAfterException);
388 
389  try {
390  convertException::wrap([&]() {
391  runTriggerPaths<T>(ep, es, &streamContext_);
392 
393  if (endpathsAreActive_) runEndPaths<T>(ep, es, &streamContext_);
394  });
395  }
396  catch(cms::Exception& ex) {
397  if (ex.context().empty()) {
398  addContextAndPrintException("Calling function StreamSchedule::processOneStream", ex, cleaningUpAfterException);
399  } else {
400  addContextAndPrintException("", ex, cleaningUpAfterException);
401  }
402  throw;
403  }
404  terminationSentry.completedSuccessfully();
405 
406  //If we got here no other exception has happened so we can propogate any Service related exceptions
407  sentry.allowThrow();
408  }
void processOneOccurrence(typename T::MyPrincipal &principal, EventSetup const &eventSetup, StreamID streamID, typename T::Context const *topContext, U const *context, bool cleaningUpAfterException=false)
Definition: WorkerManager.h:90
volatile bool endpathsAreActive_
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
StreamContext streamContext_
std::list< std::string > const & context() const
Definition: Exception.cc:191
auto wrap(F iFunc) -> decltype(iFunc())
long double T
void edm::StreamSchedule::replaceModule ( maker::ModuleHolder iMod,
std::string const &  iLabel 
)

clone the type of module with label iLabel but configure with iPSet.

Definition at line 517 of file StreamSchedule.cc.

References allWorkers(), edm::Worker::beginStream(), newFWLiteAna::found, edm::maker::ModuleHolder::replaceModuleFor(), streamContext_, and streamID_.

518  {
519  Worker* found = nullptr;
520  for (auto const& worker : allWorkers()) {
521  if (worker->description().moduleLabel() == iLabel) {
522  found = worker;
523  break;
524  }
525  }
526  if (nullptr == found) {
527  return;
528  }
529 
530  iMod->replaceModuleFor(found);
531  found->beginStream(streamID_,streamContext_);
532  }
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
StreamContext streamContext_
void edm::StreamSchedule::reportSkipped ( EventPrincipal const &  ep) const
inlineprivate

Definition at line 370 of file StreamSchedule.h.

References edm::EventID::event(), edm::EventPrincipal::id(), and edm::EventID::run().

370  {
371  Service<JobReport> reportSvc;
372  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
373  }
void edm::StreamSchedule::resetAll ( )
private

Definition at line 819 of file StreamSchedule.cc.

References results_, and skippingEvent_.

Referenced by processOneEvent(), and processOneStream().

819  {
820  skippingEvent_ = false;
821  results_->reset();
822  }
edm::propagate_const< TrigResPtr > results_
std::atomic< bool > skippingEvent_
void edm::StreamSchedule::resetEarlyDelete ( )
private

Definition at line 830 of file StreamSchedule.cc.

References KineDebug3::count(), earlyDeleteBranchToCount_, earlyDeleteHelpers_, earlyDeleteHelperToBranchIndicies_, and cmsHarvester::index.

Referenced by initializeEarlyDelete(), and processOneEvent().

830  {
831  //must be sure we have cleared the count first
832  for(auto& count:earlyDeleteBranchToCount_) {
833  count.count = 0;
834  }
835  //now reset based on how many helpers use that branch
837  ++(earlyDeleteBranchToCount_[index].count);
838  }
839  for(auto& helper: earlyDeleteHelpers_) {
840  helper.reset();
841  }
842  }
std::vector< BranchToCount > earlyDeleteBranchToCount_
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
TrigResConstPtr edm::StreamSchedule::results ( ) const
inlineprivate

Definition at line 326 of file StreamSchedule.h.

References edm::get_underlying_safe(), and results_.

Referenced by StreamSchedule().

326 {return get_underlying_safe(results_);}
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_
TrigResPtr& edm::StreamSchedule::results ( )
inlineprivate

Definition at line 327 of file StreamSchedule.h.

References edm::get_underlying_safe(), and results_.

327 {return get_underlying_safe(results_);}
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_
template<typename T >
void edm::StreamSchedule::runEndPaths ( typename T::MyPrincipal const &  ep,
EventSetup const &  es,
typename T::Context const *  context 
)
private

Definition at line 421 of file StreamSchedule.h.

References end_paths_, AlCaHLTBitMon_ParallelJobs::p, and streamID_.

421  {
422  // Note there is no state-checking safety controlling the
423  // activation/deactivation of endpaths.
424  for(auto& p : end_paths_) {
425  p.processOneOccurrence<T>(ep, es, streamID_, context);
426  }
427  }
long double T
template<typename T >
bool edm::StreamSchedule::runTriggerPaths ( typename T::MyPrincipal const &  ep,
EventSetup const &  es,
typename T::Context const *  context 
)
private

Definition at line 412 of file StreamSchedule.h.

References AlCaHLTBitMon_ParallelJobs::p, results_, streamID_, and trig_paths_.

412  {
413  for(auto& p : trig_paths_) {
414  p.processOneOccurrence<T>(ep, es, streamID_, context);
415  }
416  return results_->accept();
417  }
edm::propagate_const< TrigResPtr > results_
long double T
StreamID edm::StreamSchedule::streamID ( ) const
inline

Definition at line 182 of file StreamSchedule.h.

References streamID_.

182 { return streamID_; }
int edm::StreamSchedule::totalEvents ( ) const
inline

Return the number of events this StreamSchedule has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 218 of file StreamSchedule.h.

References total_events_.

Referenced by getTriggerReport(), and totalEventsFailed().

218  {
219  return total_events_;
220  }
int edm::StreamSchedule::totalEventsFailed ( ) const
inline

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 230 of file StreamSchedule.h.

References totalEvents(), and totalEventsPassed().

Referenced by getTriggerReport().

230  {
231  return totalEvents() - totalEventsPassed();
232  }
int totalEvents() const
int totalEventsPassed() const
int edm::StreamSchedule::totalEventsPassed ( ) const
inline

Return the number of events which have been passed by one or more trigger paths.

Definition at line 224 of file StreamSchedule.h.

References total_passed_.

Referenced by getTriggerReport(), and totalEventsFailed().

224  {
225  return total_passed_;
226  }
void edm::StreamSchedule::triggerPaths ( std::vector< std::string > &  oLabelsToFill) const

adds to oLabelsToFill the labels for all trigger paths in the process this is different from availablePaths because it includes the empty paths so matches the entries in TriggerResults exactly.

Definition at line 653 of file StreamSchedule.cc.

References trig_name_list_.

653  {
654  oLabelsToFill = trig_name_list_;
655  }

Member Data Documentation

std::shared_ptr<ActivityRegistry> edm::StreamSchedule::actReg_
private
std::vector<BranchToCount> edm::StreamSchedule::earlyDeleteBranchToCount_
private

Definition at line 346 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

std::vector<EarlyDeleteHelper> edm::StreamSchedule::earlyDeleteHelpers_
private

Definition at line 356 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

std::vector<unsigned int> edm::StreamSchedule::earlyDeleteHelperToBranchIndicies_
private

Definition at line 353 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

vstring edm::StreamSchedule::empty_trig_path_names_
private

Definition at line 341 of file StreamSchedule.h.

Referenced by fillTrigPath().

std::vector<int> edm::StreamSchedule::empty_trig_paths_
private

Definition at line 340 of file StreamSchedule.h.

Referenced by fillTrigPath(), and processOneEvent().

vstring edm::StreamSchedule::end_path_name_list_
private

Definition at line 333 of file StreamSchedule.h.

Referenced by endPaths(), fillWorkers(), and StreamSchedule().

TrigPaths edm::StreamSchedule::end_paths_
private
volatile bool edm::StreamSchedule::endpathsAreActive_
private
unsigned int edm::StreamSchedule::number_of_unscheduled_modules_
private

Definition at line 360 of file StreamSchedule.h.

Referenced by numberOfUnscheduledModules(), and StreamSchedule().

edm::propagate_const<TrigResPtr> edm::StreamSchedule::results_
private

Definition at line 335 of file StreamSchedule.h.

Referenced by processOneEvent(), resetAll(), results(), and runTriggerPaths().

edm::propagate_const<WorkerPtr> edm::StreamSchedule::results_inserter_
private

Definition at line 337 of file StreamSchedule.h.

Referenced by processOneEvent(), and StreamSchedule().

std::atomic<bool> edm::StreamSchedule::skippingEvent_
private

Definition at line 365 of file StreamSchedule.h.

Referenced by fillTrigPath(), and resetAll().

StreamContext edm::StreamSchedule::streamContext_
private
StreamID edm::StreamSchedule::streamID_
private
int edm::StreamSchedule::total_events_
private

Definition at line 358 of file StreamSchedule.h.

Referenced by clearCounters(), processOneEvent(), and totalEvents().

int edm::StreamSchedule::total_passed_
private

Definition at line 359 of file StreamSchedule.h.

Referenced by clearCounters(), processOneEvent(), and totalEventsPassed().

vstring edm::StreamSchedule::trig_name_list_
private

Definition at line 332 of file StreamSchedule.h.

Referenced by StreamSchedule(), and triggerPaths().

TrigPaths edm::StreamSchedule::trig_paths_
private
WorkerManager edm::StreamSchedule::workerManager_
private