CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Classes | Public Types | Public Member Functions | Private Member Functions | Private Attributes
edm::StreamSchedule Class Reference

#include <StreamSchedule.h>

Classes

class  SendTerminationSignalIfException
 

Public Types

typedef std::vector< Worker * > AllWorkers
 
typedef std::vector< PathNonTrigPaths
 
typedef std::vector< WorkerInPathPathWorkers
 
typedef std::vector< PathTrigPaths
 
typedef std::shared_ptr
< HLTGlobalStatus const > 
TrigResConstPtr
 
typedef std::shared_ptr
< HLTGlobalStatus
TrigResPtr
 
typedef std::vector< std::string > vstring
 
typedef std::shared_ptr< WorkerWorkerPtr
 
typedef std::vector< Worker * > Workers
 

Public Member Functions

AllWorkers const & allWorkers () const
 returns the collection of pointers to workers More...
 
void availablePaths (std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill the labels for all paths in the process More...
 
void beginStream ()
 
void clearCounters ()
 Clear all the counters in the trigger report. More...
 
StreamContext const & context () const
 
void enableEndPaths (bool active)
 
void endPaths (std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill the labels for all end paths in the process More...
 
bool endPathsEnabled () const
 
void endStream ()
 
std::vector< ModuleDescription
const * > 
getAllModuleDescriptions () const
 
void getTriggerReport (TriggerReport &rep) const
 
void moduleDescriptionsInEndPath (std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
 
void moduleDescriptionsInPath (std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
 
void modulesInPath (std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel More...
 
unsigned int numberOfUnscheduledModules () const
 
void processOneEventAsync (WaitingTaskHolder iTask, EventPrincipal &ep, EventSetup const &es)
 
template<typename T >
void processOneStream (typename T::MyPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
 
void replaceModule (maker::ModuleHolder *iMod, std::string const &iLabel)
 clone the type of module with label iLabel but configure with iPSet. More...
 
StreamID streamID () const
 
 StreamSchedule (std::shared_ptr< TriggerResultInserter > inserter, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
 
 StreamSchedule (StreamSchedule const &)=delete
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
void triggerPaths (std::vector< std::string > &oLabelsToFill) const
 

Private Member Functions

ExceptionToActionTable const & actionTable () const
 returns the action table More...
 
void addToAllWorkers (Worker *w)
 
void fillEndPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name)
 
void fillTrigPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, vstring *labelsOnTriggerPaths)
 
void fillWorkers (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, vstring *labelsOnPaths)
 
void finishedPaths (std::exception_ptr, WaitingTaskHolder, EventPrincipal &ep, EventSetup const &es)
 
std::exception_ptr finishProcessOneEvent (std::exception_ptr)
 
void initializeEarlyDelete (ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
 
void reportSkipped (EventPrincipal const &ep) const
 
void resetAll ()
 
void resetEarlyDelete ()
 
TrigResConstPtr results () const
 
TrigResPtrresults ()
 
template<typename T >
void runEndPaths (typename T::MyPrincipal const &, EventSetup const &, typename T::Context const *)
 
template<typename T >
bool runTriggerPaths (typename T::MyPrincipal const &, EventSetup const &, typename T::Context const *)
 

Private Attributes

std::shared_ptr< ActivityRegistryactReg_
 
std::vector< BranchToCountearlyDeleteBranchToCount_
 
std::vector< EarlyDeleteHelperearlyDeleteHelpers_
 
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
 
vstring empty_trig_path_names_
 
std::vector< int > empty_trig_paths_
 
vstring end_path_name_list_
 
TrigPaths end_paths_
 
volatile bool endpathsAreActive_
 
unsigned int number_of_unscheduled_modules_
 
edm::propagate_const< TrigResPtrresults_
 
edm::propagate_const< WorkerPtrresults_inserter_
 
std::atomic< bool > skippingEvent_
 
StreamContext streamContext_
 
StreamID streamID_
 
int total_events_
 
int total_passed_
 
vstring trig_name_list_
 
TrigPaths trig_paths_
 
WorkerManager workerManager_
 

Detailed Description

Definition at line 141 of file StreamSchedule.h.

Member Typedef Documentation

typedef std::vector<Worker*> edm::StreamSchedule::AllWorkers

Definition at line 149 of file StreamSchedule.h.

Definition at line 145 of file StreamSchedule.h.

Definition at line 153 of file StreamSchedule.h.

typedef std::vector<Path> edm::StreamSchedule::TrigPaths

Definition at line 144 of file StreamSchedule.h.

typedef std::shared_ptr<HLTGlobalStatus const> edm::StreamSchedule::TrigResConstPtr

Definition at line 147 of file StreamSchedule.h.

Definition at line 146 of file StreamSchedule.h.

typedef std::vector<std::string> edm::StreamSchedule::vstring

Definition at line 143 of file StreamSchedule.h.

typedef std::shared_ptr<Worker> edm::StreamSchedule::WorkerPtr

Definition at line 148 of file StreamSchedule.h.

typedef std::vector<Worker*> edm::StreamSchedule::Workers

Definition at line 151 of file StreamSchedule.h.

Constructor & Destructor Documentation

edm::StreamSchedule::StreamSchedule ( std::shared_ptr< TriggerResultInserter inserter,
std::shared_ptr< ModuleRegistry modReg,
ParameterSet proc_pset,
service::TriggerNamesService tns,
PreallocationConfiguration const &  prealloc,
ProductRegistry pregistry,
BranchIDListHelper branchIDListHelper,
ExceptionToActionTable const &  actions,
std::shared_ptr< ActivityRegistry areg,
std::shared_ptr< ProcessConfiguration processConfiguration,
bool  allowEarlyDelete,
StreamID  streamID,
ProcessContext const *  processContext 
)

Definition at line 133 of file StreamSchedule.cc.

References actReg_, addToAllWorkers(), edm::WorkerManager::addToUnscheduledWorkers(), allWorkers(), assert(), end_path_name_list_, end_paths_, fillEndPath(), fillTrigPath(), edm::propagate_const< T >::get(), edm::ParameterSet::getParameter(), edm::ParameterSet::getPSetForUpdate(), edm::ParameterSet::getUntrackedParameter(), edm::ParameterSet::getUntrackedParameterSet(), initializeEarlyDelete(), diffTwoXMLs::label, number_of_unscheduled_modules_, geometryDiff::opts, results(), results_inserter_, edm::WorkerManager::setOnDemandProducts(), trig_name_list_, trig_paths_, edm::StreamID::value(), and workerManager_.

145  :
146  workerManager_(modReg,areg, actions),
147  actReg_(areg),
148  trig_name_list_(tns.getTrigPaths()),
149  end_path_name_list_(tns.getEndPaths()),
150  results_(new HLTGlobalStatus(trig_name_list_.size())),
152  trig_paths_(),
153  end_paths_(),
154  total_events_(),
155  total_passed_(),
158  streamContext_(streamID_, processContext),
159  endpathsAreActive_(true),
160  skippingEvent_(false){
161 
162  ParameterSet const& opts = proc_pset.getUntrackedParameterSet("options", ParameterSet());
163  bool hasPath = false;
164 
165  int trig_bitpos = 0;
166  trig_paths_.reserve(trig_name_list_.size());
167  vstring labelsOnTriggerPaths;
168  for (auto const& trig_name : trig_name_list_) {
169  fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name, results(), &labelsOnTriggerPaths);
170  ++trig_bitpos;
171  hasPath = true;
172  }
173 
174  if (hasPath) {
175  // the results inserter stands alone
176  inserter->setTrigResultForStream(streamID.value(), results());
177 
178  results_inserter_ = makeInserter(actions, actReg_, inserter);
180  }
181 
182  // fill normal endpaths
183  int bitpos = 0;
184  end_paths_.reserve(end_path_name_list_.size());
185  for (auto const& end_path_name : end_path_name_list_) {
186  fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name);
187  ++bitpos;
188  }
189 
190  //See if all modules were used
191  std::set<std::string> usedWorkerLabels;
192  for (auto const& worker : allWorkers()) {
193  usedWorkerLabels.insert(worker->description().moduleLabel());
194  }
195  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string> >("@all_modules"));
196  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
197  std::vector<std::string> unusedLabels;
198  set_difference(modulesInConfigSet.begin(), modulesInConfigSet.end(),
199  usedWorkerLabels.begin(), usedWorkerLabels.end(),
200  back_inserter(unusedLabels));
201  //does the configuration say we should allow on demand?
202  bool allowUnscheduled = opts.getUntrackedParameter<bool>("allowUnscheduled", false);
203  std::set<std::string> unscheduledLabels;
204  std::vector<std::string> shouldBeUsedLabels;
205  if (!unusedLabels.empty()) {
206  //Need to
207  // 1) create worker
208  // 2) if it is a WorkerT<EDProducer>, add it to our list
209  // 3) hand list to our delayed reader
210  for (auto const& label : unusedLabels) {
211  if (allowUnscheduled) {
212  bool isTracked;
213  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
214  assert(isTracked);
215  assert(modulePSet != nullptr);
216  workerManager_.addToUnscheduledWorkers(*modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
217  } else {
218  //everthing is marked are unused so no 'on demand' allowed
219  shouldBeUsedLabels.push_back(label);
220  }
221  }
222  if (!shouldBeUsedLabels.empty()) {
223  std::ostringstream unusedStream;
224  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
225  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
226  itLabelEnd = shouldBeUsedLabels.end();
227  itLabel != itLabelEnd;
228  ++itLabel) {
229  unusedStream << ",'" << *itLabel << "'";
230  }
231  LogInfo("path")
232  << "The following module labels are not assigned to any path:\n"
233  << unusedStream.str()
234  << "\n";
235  }
236  }
237  if (!unscheduledLabels.empty()) {
238  number_of_unscheduled_modules_=unscheduledLabels.size();
239  workerManager_.setOnDemandProducts(preg, unscheduledLabels);
240  }
241 
242 
243  initializeEarlyDelete(*modReg, opts,preg,allowEarlyDelete);
244 
245  } // StreamSchedule::StreamSchedule
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name)
vector< string > vstring
Definition: ExoticaDQM.cc:8
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
void setOnDemandProducts(ProductRegistry &pregistry, std::set< std::string > const &unscheduledLabels) const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
volatile bool endpathsAreActive_
assert(m_qm.get())
processConfiguration
Definition: Schedule.cc:384
void addToAllWorkers(Worker *w)
edm::propagate_const< WorkerPtr > results_inserter_
actions
Definition: Schedule.cc:384
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
StreamID streamID() const
StreamContext streamContext_
element_type const * get() const
areg
Definition: Schedule.cc:384
unsigned int value() const
Definition: StreamID.h:46
edm::propagate_const< TrigResPtr > results_
std::atomic< bool > skippingEvent_
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, vstring *labelsOnTriggerPaths)
preg
Definition: Schedule.cc:384
TrigResConstPtr results() const
prealloc
Definition: Schedule.cc:384
edm::StreamSchedule::StreamSchedule ( StreamSchedule const &  )
delete

Member Function Documentation

ExceptionToActionTable const& edm::StreamSchedule::actionTable ( ) const
inlineprivate

returns the action table

Definition at line 287 of file StreamSchedule.h.

References edm::WorkerManager::actionTable(), and workerManager_.

Referenced by fillEndPath(), fillTrigPath(), and finishedPaths().

287  {
288  return workerManager_.actionTable();
289  }
WorkerManager workerManager_
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:70
void edm::StreamSchedule::addToAllWorkers ( Worker w)
private

Definition at line 885 of file StreamSchedule.cc.

References edm::WorkerManager::addToAllWorkers(), and workerManager_.

Referenced by fillEndPath(), fillTrigPath(), and StreamSchedule().

885  {
887  }
const double w
Definition: UKUtility.cc:23
WorkerManager workerManager_
void addToAllWorkers(Worker *w)
AllWorkers const& edm::StreamSchedule::allWorkers ( ) const
inline

returns the collection of pointers to workers

Definition at line 254 of file StreamSchedule.h.

References edm::WorkerManager::allWorkers(), and workerManager_.

Referenced by clearCounters(), getAllModuleDescriptions(), getTriggerReport(), initializeEarlyDelete(), replaceModule(), and StreamSchedule().

254  {
255  return workerManager_.allWorkers();
256  }
WorkerManager workerManager_
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:66
void edm::StreamSchedule::availablePaths ( std::vector< std::string > &  oLabelsToFill) const

adds to oLabelsToFill the labels for all paths in the process

Definition at line 704 of file StreamSchedule.cc.

References edm::Path::name(), create_public_lumi_plots::transform, and trig_paths_.

704  {
705  oLabelsToFill.reserve(trig_paths_.size());
706  std::transform(trig_paths_.begin(),
707  trig_paths_.end(),
708  std::back_inserter(oLabelsToFill),
709  std::bind(&Path::name, std::placeholders::_1));
710  }
std::string const & name() const
Definition: Path.h:68
void edm::StreamSchedule::beginStream ( )

Definition at line 510 of file StreamSchedule.cc.

References edm::WorkerManager::beginStream(), streamContext_, streamID_, and workerManager_.

510  {
512  }
WorkerManager workerManager_
StreamContext streamContext_
void beginStream(StreamID iID, StreamContext &streamContext)
void edm::StreamSchedule::clearCounters ( )

Clear all the counters in the trigger report.

Definition at line 870 of file StreamSchedule.cc.

References allWorkers(), edm::Path::clearCounters(), edm::Worker::clearCounters(), end_paths_, edm::for_all(), total_events_, total_passed_, and trig_paths_.

870  {
871  using std::placeholders::_1;
873  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
874  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
875  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
876  }
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void clearCounters()
Definition: Worker.h:145
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void clearCounters()
Definition: Path.cc:168
StreamContext const& edm::StreamSchedule::context ( ) const
inline

Definition at line 262 of file StreamSchedule.h.

References streamContext_.

Referenced by runEndPaths(), and runTriggerPaths().

262 { return streamContext_;}
StreamContext streamContext_
void edm::StreamSchedule::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 798 of file StreamSchedule.cc.

References endpathsAreActive_.

798  {
799  endpathsAreActive_ = active;
800  }
volatile bool endpathsAreActive_
void edm::StreamSchedule::endPaths ( std::vector< std::string > &  oLabelsToFill) const

adds to oLabelsToFill the labels for all end paths in the process

Definition at line 718 of file StreamSchedule.cc.

References end_path_name_list_.

718  {
719  oLabelsToFill = end_path_name_list_;
720  }
bool edm::StreamSchedule::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 803 of file StreamSchedule.cc.

References endpathsAreActive_.

803  {
804  return endpathsAreActive_;
805  }
volatile bool endpathsAreActive_
void edm::StreamSchedule::endStream ( )

Definition at line 514 of file StreamSchedule.cc.

References edm::WorkerManager::endStream(), streamContext_, streamID_, and workerManager_.

514  {
516  }
void endStream(StreamID iID, StreamContext &streamContext)
WorkerManager workerManager_
StreamContext streamContext_
void edm::StreamSchedule::fillEndPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name 
)
private

Definition at line 489 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), end_paths_, fillWorkers(), edm::for_all(), edm::PathContext::kEndPath, and streamContext_.

Referenced by StreamSchedule().

493  {
494  using std::placeholders::_1;
495  PathWorkers tmpworkers;
496  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, 0);
497  Workers holder;
498 
499  for (PathWorkers::iterator wi(tmpworkers.begin()), we(tmpworkers.end()); wi != we; ++wi) {
500  holder.push_back(wi->getWorker());
501  }
502 
503  if (!tmpworkers.empty()) {
504  //EndPaths are not supposed to stop if SkipEvent type exception happens
505  end_paths_.emplace_back(bitpos, name, tmpworkers, TrigResPtr(), actionTable(), actReg_, &streamContext_, nullptr, PathContext::PathType::kEndPath);
506  }
507  for_all(holder, std::bind(&StreamSchedule::addToAllWorkers, this, _1));
508  }
std::vector< Worker * > Workers
std::shared_ptr< HLTGlobalStatus > TrigResPtr
processConfiguration
Definition: Schedule.cc:384
void addToAllWorkers(Worker *w)
ExceptionToActionTable const & actionTable() const
returns the action table
std::shared_ptr< ActivityRegistry > actReg_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
std::vector< WorkerInPath > PathWorkers
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, vstring *labelsOnPaths)
StreamContext streamContext_
preg
Definition: Schedule.cc:384
prealloc
Definition: Schedule.cc:384
void edm::StreamSchedule::fillTrigPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
TrigResPtr  trptr,
vstring labelsOnTriggerPaths 
)
private

Definition at line 463 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_trig_path_names_, empty_trig_paths_, fillWorkers(), edm::for_all(), edm::PathContext::kPath, skippingEvent_, streamContext_, and trig_paths_.

Referenced by StreamSchedule().

468  {
469  using std::placeholders::_1;
470  PathWorkers tmpworkers;
471  Workers holder;
472  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, labelsOnTriggerPaths);
473 
474  for (PathWorkers::iterator wi(tmpworkers.begin()),
475  we(tmpworkers.end()); wi != we; ++wi) {
476  holder.push_back(wi->getWorker());
477  }
478 
479  // an empty path will cause an extra bit that is not used
480  if (!tmpworkers.empty()) {
481  trig_paths_.emplace_back(bitpos, name, tmpworkers, trptr, actionTable(), actReg_, &streamContext_, &skippingEvent_, PathContext::PathType::kPath);
482  } else {
483  empty_trig_paths_.push_back(bitpos);
484  empty_trig_path_names_.push_back(name);
485  }
486  for_all(holder, std::bind(&StreamSchedule::addToAllWorkers, this, _1));
487  }
std::vector< Worker * > Workers
std::vector< int > empty_trig_paths_
processConfiguration
Definition: Schedule.cc:384
void addToAllWorkers(Worker *w)
ExceptionToActionTable const & actionTable() const
returns the action table
std::shared_ptr< ActivityRegistry > actReg_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
std::vector< WorkerInPath > PathWorkers
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, vstring *labelsOnPaths)
StreamContext streamContext_
vstring empty_trig_path_names_
std::atomic< bool > skippingEvent_
preg
Definition: Schedule.cc:384
prealloc
Definition: Schedule.cc:384
void edm::StreamSchedule::fillWorkers ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
std::string const &  name,
bool  ignoreFilters,
PathWorkers out,
vstring labelsOnPaths 
)
private

Definition at line 404 of file StreamSchedule.cc.

References assert(), edm::errors::Configuration, edm::Worker::description(), end_path_name_list_, Exception, edm::ParameterSet::getParameter(), edm::ParameterSet::getPSetForUpdate(), edm::ParameterSet::getUntrackedParameter(), edm::WorkerManager::getWorker(), edm::WorkerInPath::Ignore, edm::Worker::kFilter, edm::ModuleDescription::moduleName(), edm::Worker::moduleType(), mergeVDriftHistosByStation::name, edm::WorkerInPath::Normal, EgammaValidation_cff::pathName, edm::search_all(), AlCaHLTBitMon_QueryRunRegistry::string, edm::WorkerInPath::Veto, and workerManager_.

Referenced by fillEndPath(), and fillTrigPath().

411  {
412  vstring modnames = proc_pset.getParameter<vstring>(pathName);
413  PathWorkers tmpworkers;
414 
415  unsigned int placeInPath = 0;
416  for (auto const& name : modnames) {
417 
418  if (labelsOnPaths) labelsOnPaths->push_back(name);
419 
421  if (name[0] == '!') filterAction = WorkerInPath::Veto;
422  else if (name[0] == '-') filterAction = WorkerInPath::Ignore;
423 
424  std::string moduleLabel = name;
425  if (filterAction != WorkerInPath::Normal) moduleLabel.erase(0, 1);
426 
427  bool isTracked;
428  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
429  if (modpset == 0) {
430  std::string pathType("endpath");
432  pathType = std::string("path");
433  }
435  "The unknown module label \"" << moduleLabel <<
436  "\" appears in " << pathType << " \"" << pathName <<
437  "\"\n please check spelling or remove that label from the path.";
438  }
439  assert(isTracked);
440 
441  Worker* worker = workerManager_.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
442  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType()==Worker::kFilter) {
443  // We have a filter on an end path, and the filter is not explicitly ignored.
444  // See if the filter is allowed.
445  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
446  if (!search_all(allowed_filters, worker->description().moduleName())) {
447  // Filter is not allowed. Ignore the result, and issue a warning.
448  filterAction = WorkerInPath::Ignore;
449  LogWarning("FilterOnEndPath")
450  << "The EDFilter '" << worker->description().moduleName() << "' with module label '" << moduleLabel << "' appears on EndPath '" << pathName << "'.\n"
451  << "The return value of the filter will be ignored.\n"
452  << "To suppress this warning, either remove the filter from the endpath,\n"
453  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
454  }
455  }
456  tmpworkers.emplace_back(worker, filterAction, placeInPath);
457  ++placeInPath;
458  }
459 
460  out.swap(tmpworkers);
461  }
vector< string > vstring
Definition: ExoticaDQM.cc:8
assert(m_qm.get())
processConfiguration
Definition: Schedule.cc:384
WorkerManager workerManager_
std::vector< WorkerInPath > PathWorkers
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:46
preg
Definition: Schedule.cc:384
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
prealloc
Definition: Schedule.cc:384
void edm::StreamSchedule::finishedPaths ( std::exception_ptr  iExcept,
WaitingTaskHolder  iWait,
EventPrincipal ep,
EventSetup const &  es 
)
private

Definition at line 590 of file StreamSchedule.cc.

References mps_fire::action, actionTable(), cms::Exception::addContext(), assert(), cms::Exception::category(), edm::WaitingTaskHolder::doneWaiting(), alignCSCRings::e, end_paths_, endpathsAreActive_, edm::exception_actions::FailPath, edm::ExceptionToActionTable::find(), finishProcessOneEvent(), edm::propagate_const< T >::get(), edm::EventPrincipal::id(), edm::exception_actions::IgnoreCompletely, edm::ServiceRegistry::instance(), edm::make_waiting_task(), cmsPerfStripChart::operate(), or, edm::ServiceRegistry::presentToken(), edm::printCmsExceptionWarning(), results_, results_inserter_, edm::exception_actions::SkipEvent, streamContext_, streamID_, and total_passed_.

Referenced by processOneEventAsync().

591  {
592 
593  if(iExcept) {
594  try {
595  std::rethrow_exception(iExcept);
596  }
597  catch(cms::Exception& e) {
601  if (action == exception_actions::SkipEvent) {
602  edm::printCmsExceptionWarning("SkipEvent", e);
603  iExcept = std::exception_ptr();
604  } else {
605  iExcept = std::current_exception();
606  }
607  }
608  catch(...) {
609  iExcept = std::current_exception();
610  }
611  }
612 
613 
614  if((not iExcept) and results_->accept()) {
615  ++total_passed_;
616  }
617 
618  if((not iExcept) and (nullptr != results_inserter_.get())) {
619  try {
620  ParentContext parentContext(&streamContext_);
621  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
622 
623  results_inserter_->doWork<Traits>(ep, es, streamID_, parentContext, &streamContext_);
624  }
625  catch (cms::Exception & ex) {
626  ex.addContext("Calling produce method for module TriggerResultInserter");
627  std::ostringstream ost;
628  ost << "Processing " << ep.id();
629  ex.addContext(ost.str());
630  iExcept = std::current_exception();
631  }
632  catch(...) {
633  iExcept = std::current_exception();
634  }
635  }
636  if(end_paths_.empty() or iExcept or (not endpathsAreActive_)) {
637  iExcept = finishProcessOneEvent(iExcept);
638  iWait.doneWaiting(iExcept);
639  } else {
640  auto serviceToken = ServiceRegistry::instance().presentToken();
641 
642  auto endPathsDone = make_waiting_task(tbb::task::allocate_root(),
643  [iWait,this,serviceToken](std::exception_ptr const* iPtr) mutable
644  {
645  ServiceRegistry::Operate operate(serviceToken);
646 
647  std::exception_ptr ptr;
648  if(iPtr) {
649  ptr = *iPtr;
650  }
651  iWait.doneWaiting(finishProcessOneEvent(ptr));
652  });
653  //The holder guarantees that if the paths finish before the loop ends
654  // that we do not start too soon. It also guarantees that the task will
655  // run under that condition.
656  WaitingTaskHolder taskHolder(endPathsDone);
657  for(auto it = end_paths_.rbegin(), itEnd = end_paths_.rend();
658  it != itEnd; ++it) {
659  it->processOneOccurrenceAsync(endPathsDone,ep, es, streamID_, &streamContext_);
660  }
661  }
662  }
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventIDconst &, edm::Timestampconst & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
volatile bool endpathsAreActive_
assert(m_qm.get())
std::string const & category() const
Definition: Exception.cc:183
exception_actions::ActionCodes find(const std::string &category) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
ServiceToken presentToken() const
StreamContext streamContext_
static ServiceRegistry & instance()
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
element_type const * get() const
edm::propagate_const< TrigResPtr > results_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
void addContext(std::string const &context)
Definition: Exception.cc:227
string action
Definition: mps_fire.py:28
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
std::exception_ptr edm::StreamSchedule::finishProcessOneEvent ( std::exception_ptr  iExcept)
private

Definition at line 666 of file StreamSchedule.cc.

References actReg_, edm::addContextAndPrintException(), cms::Exception::context(), edm::ExceptionFromThisContext, resetEarlyDelete(), streamContext_, and edm::convertException::wrap().

Referenced by finishedPaths().

666  {
667  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
668 
669  if(iExcept) {
670  //add context information to the exception and print message
671  try {
672  convertException::wrap([&]() {
673  std::rethrow_exception(iExcept);
674  });
675  } catch(cms::Exception& ex) {
676  bool const cleaningUpAfterException = false;
677  if (ex.context().empty()) {
678  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
679  } else {
680  addContextAndPrintException("", ex, cleaningUpAfterException);
681  }
682  iExcept = std::current_exception();
683  }
684 
685  actReg_->preStreamEarlyTerminationSignal_(streamContext_,TerminationOrigin::ExceptionFromThisContext);
686  }
687 
688  try {
689  Traits::postScheduleSignal(actReg_.get(), &streamContext_);
690  } catch(...) {
691  if(not iExcept) {
692  iExcept = std::current_exception();
693  }
694  }
695  if(not iExcept ) {
697  }
698 
699  return iExcept;
700  }
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::shared_ptr< ActivityRegistry > actReg_
StreamContext streamContext_
std::list< std::string > const & context() const
Definition: Exception.cc:191
auto wrap(F iFunc) -> decltype(iFunc())
std::vector< ModuleDescription const * > edm::StreamSchedule::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this StreamSchedule. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 536 of file StreamSchedule.cc.

References allWorkers(), AlCaHLTBitMon_ParallelJobs::p, mps_fire::result, and findQualityFiles::size.

536  {
537  std::vector<ModuleDescription const*> result;
538  result.reserve(allWorkers().size());
539 
540  for (auto const& worker : allWorkers()) {
541  ModuleDescription const* p = worker->descPtr();
542  result.push_back(p);
543  }
544  return result;
545  }
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
tuple result
Definition: mps_fire.py:84
tuple size
Write out results.
void edm::StreamSchedule::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 858 of file StreamSchedule.cc.

References allWorkers(), end_paths_, edm::TriggerReport::endPathSummaries, edm::TriggerReport::eventSummary, edm::fillPathSummary(), edm::fillWorkerSummary(), edm::sort_all(), edm::EventSummary::totalEvents, totalEvents(), edm::EventSummary::totalEventsFailed, totalEventsFailed(), edm::EventSummary::totalEventsPassed, totalEventsPassed(), trig_paths_, edm::TriggerReport::trigPathSummaries, and edm::TriggerReport::workerSummaries.

858  {
859  rep.eventSummary.totalEvents += totalEvents();
860  rep.eventSummary.totalEventsPassed += totalEventsPassed();
861  rep.eventSummary.totalEventsFailed += totalEventsFailed();
862 
863  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
864  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
865  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
866  sort_all(rep.workerSummaries);
867  }
string rep
Definition: cuy.py:1188
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
int totalEventsFailed() const
int totalEvents() const
int totalEventsPassed() const
static void fillPathSummary(Path const &path, PathSummary &sum)
void sort_all(RandomAccessSequence &s)
wrappers for std::sort
Definition: Algorithms.h:120
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
void edm::StreamSchedule::initializeEarlyDelete ( ModuleRegistry modReg,
edm::ParameterSet const &  opts,
edm::ProductRegistry const &  preg,
bool  allowEarlyDelete 
)
private

Definition at line 248 of file StreamSchedule.cc.

References allWorkers(), edm::BranchDescription::branchName(), edm::maker::ModuleHolder::createOutputModuleCommunicator(), delta, earlyDeleteBranchToCount_, earlyDeleteHelpers_, earlyDeleteHelperToBranchIndicies_, end_paths_, edm::ModuleRegistry::forAllModuleHolders(), newFWLiteAna::found, edm::pset::Registry::getMapped(), cmsHarvester::index, edm::InEvent, edm::pset::Registry::instance(), cmsLHEtoEOSManager::l, gen::n, AlCaHLTBitMon_ParallelJobs::p, TrackValidation_cff::pset, resetEarlyDelete(), AlCaHLTBitMon_QueryRunRegistry::string, trig_paths_, and w.

Referenced by StreamSchedule().

250  {
251  //for now, if have a subProcess, don't allow early delete
252  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
253  if(not allowEarlyDelete) return;
254 
255  //see if 'canDeleteEarly' was set and if so setup the list with those products actually
256  // registered for this job
257  std::multimap<std::string,Worker*> branchToReadingWorker;
258  initializeBranchToReadingWorker(opts,preg,branchToReadingWorker);
259 
260  //If no delete early items have been specified we don't have to do anything
261  if(branchToReadingWorker.size()==0) {
262  return;
263  }
264  const std::vector<std::string> kEmpty;
265  std::map<Worker*,unsigned int> reserveSizeForWorker;
266  unsigned int upperLimitOnReadingWorker =0;
267  unsigned int upperLimitOnIndicies = 0;
268  unsigned int nUniqueBranchesToDelete=branchToReadingWorker.size();
269 
270  //talk with output modules first
271  modReg.forAllModuleHolders([this, &branchToReadingWorker,&nUniqueBranchesToDelete](maker::ModuleHolder* iHolder){
272  auto comm = iHolder->createOutputModuleCommunicator();
273  if (comm) {
274  if(branchToReadingWorker.size()>0) {
275  //If an OutputModule needs a product, we can't delete it early
276  // so we should remove it from our list
277  SelectedProductsForBranchType const& kept = comm->keptProducts();
278  for(auto const& item: kept[InEvent]) {
279  BranchDescription const& desc = *item.first;
280  auto found = branchToReadingWorker.equal_range(desc.branchName());
281  if(found.first !=found.second) {
282  --nUniqueBranchesToDelete;
283  branchToReadingWorker.erase(found.first,found.second);
284  }
285  }
286  }
287  }
288  });
289 
290  if(branchToReadingWorker.size()==0) {
291  return;
292  }
293 
294  for (auto w :allWorkers()) {
295  //determine if this module could read a branch we want to delete early
296  auto pset = pset::Registry::instance()->getMapped(w->description().parameterSetID());
297  if(0!=pset) {
298  auto branches = pset->getUntrackedParameter<std::vector<std::string>>("mightGet",kEmpty);
299  if(not branches.empty()) {
300  ++upperLimitOnReadingWorker;
301  }
302  for(auto const& branch:branches){
303  auto found = branchToReadingWorker.equal_range(branch);
304  if(found.first != found.second) {
305  ++upperLimitOnIndicies;
306  ++reserveSizeForWorker[w];
307  if(nullptr == found.first->second) {
308  found.first->second = w;
309  } else {
310  branchToReadingWorker.insert(make_pair(found.first->first,w));
311  }
312  }
313  }
314  }
315  }
316  {
317  auto it = branchToReadingWorker.begin();
318  std::vector<std::string> unusedBranches;
319  while(it !=branchToReadingWorker.end()) {
320  if(it->second == nullptr) {
321  unusedBranches.push_back(it->first);
322  //erasing the object invalidates the iterator so must advance it first
323  auto temp = it;
324  ++it;
325  branchToReadingWorker.erase(temp);
326  } else {
327  ++it;
328  }
329  }
330  if(not unusedBranches.empty()) {
331  LogWarning l("UnusedProductsForCanDeleteEarly");
332  l<<"The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
333  " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
334  for(auto const& n:unusedBranches){
335  l<<"\n "<<n;
336  }
337  }
338  }
339  if(0!=branchToReadingWorker.size()) {
340  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
341  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies,0);
342  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
343  std::map<const Worker*,EarlyDeleteHelper*> alreadySeenWorkers;
344  std::string lastBranchName;
345  size_t nextOpenIndex = 0;
346  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
347  for(auto& branchAndWorker:branchToReadingWorker) {
348  if(lastBranchName != branchAndWorker.first) {
349  //have to put back the period we removed earlier in order to get the proper name
350  BranchID bid(branchAndWorker.first+".");
351  earlyDeleteBranchToCount_.emplace_back(bid,0U);
352  lastBranchName = branchAndWorker.first;
353  }
354  auto found = alreadySeenWorkers.find(branchAndWorker.second);
355  if(alreadySeenWorkers.end() == found) {
356  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
357  // all the branches that might be read by this worker. However, initially we will only tell the
358  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
359  // EarlyDeleteHelper will automatically advance its internal end pointer.
360  size_t index = nextOpenIndex;
361  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
363  earlyDeleteHelpers_.emplace_back(beginAddress+index,
364  beginAddress+index+1,
366  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
367  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second,&(earlyDeleteHelpers_.back())));
368  nextOpenIndex +=nIndices;
369  } else {
370  found->second->appendIndex(earlyDeleteBranchToCount_.size()-1);
371  }
372  }
373 
374  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
375  // space needed for each module
376  auto itLast = earlyDeleteHelpers_.begin();
377  for(auto it = earlyDeleteHelpers_.begin()+1;it != earlyDeleteHelpers_.end();++it) {
378  if(itLast->end() != it->begin()) {
379  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
380  unsigned int delta = it->begin()- itLast->end();
381  it->shiftIndexPointers(delta);
382 
384  (itLast->end()-beginAddress),
386  (it->begin()-beginAddress));
387  }
388  itLast = it;
389  }
390  earlyDeleteHelperToBranchIndicies_.erase(earlyDeleteHelperToBranchIndicies_.begin()+(itLast->end()-beginAddress),
392 
393  //now tell the paths about the deleters
394  for(auto& p : trig_paths_) {
395  p.setEarlyDeleteHelpers(alreadySeenWorkers);
396  }
397  for(auto& p : end_paths_) {
398  p.setEarlyDeleteHelpers(alreadySeenWorkers);
399  }
401  }
402  }
dbl * delta
Definition: mlp_gen.cc:36
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
const double w
Definition: UKUtility.cc:23
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
std::vector< BranchToCount > earlyDeleteBranchToCount_
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:18
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
preg
Definition: Schedule.cc:384
static Registry * instance()
Definition: Registry.cc:12
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void edm::StreamSchedule::moduleDescriptionsInEndPath ( std::string const &  iEndPathLabel,
std::vector< ModuleDescription const * > &  descriptions,
unsigned int  hint 
) const

Definition at line 769 of file StreamSchedule.cc.

References end_paths_, newFWLiteAna::found, i, and edm::Path::name().

771  {
772  descriptions.clear();
773  bool found = false;
774  TrigPaths::const_iterator itFound;
775 
776  if(hint < end_paths_.size()) {
777  itFound = end_paths_.begin() + hint;
778  if(itFound->name() == iEndPathLabel) found = true;
779  }
780  if(!found) {
781  // if the hint did not work, do it the slow way
782  itFound = std::find_if (end_paths_.begin(),
783  end_paths_.end(),
784  std::bind(std::equal_to<std::string>(),
785  iEndPathLabel,
786  std::bind(&Path::name, std::placeholders::_1)));
787  if (itFound != end_paths_.end()) found = true;
788  }
789  if (found) {
790  descriptions.reserve(itFound->size());
791  for (size_t i = 0; i < itFound->size(); ++i) {
792  descriptions.push_back(itFound->getWorker(i)->descPtr());
793  }
794  }
795  }
int i
Definition: DBlmapReader.cc:9
std::string const & name() const
Definition: Path.h:68
void edm::StreamSchedule::moduleDescriptionsInPath ( std::string const &  iPathLabel,
std::vector< ModuleDescription const * > &  descriptions,
unsigned int  hint 
) const

Definition at line 740 of file StreamSchedule.cc.

References newFWLiteAna::found, i, edm::Path::name(), and trig_paths_.

742  {
743  descriptions.clear();
744  bool found = false;
745  TrigPaths::const_iterator itFound;
746 
747  if(hint < trig_paths_.size()) {
748  itFound = trig_paths_.begin() + hint;
749  if(itFound->name() == iPathLabel) found = true;
750  }
751  if(!found) {
752  // if the hint did not work, do it the slow way
753  itFound = std::find_if (trig_paths_.begin(),
754  trig_paths_.end(),
755  std::bind(std::equal_to<std::string>(),
756  iPathLabel,
757  std::bind(&Path::name, std::placeholders::_1)));
758  if (itFound != trig_paths_.end()) found = true;
759  }
760  if (found) {
761  descriptions.reserve(itFound->size());
762  for (size_t i = 0; i < itFound->size(); ++i) {
763  descriptions.push_back(itFound->getWorker(i)->descPtr());
764  }
765  }
766  }
int i
Definition: DBlmapReader.cc:9
std::string const & name() const
Definition: Path.h:68
void edm::StreamSchedule::modulesInPath ( std::string const &  iPathLabel,
std::vector< std::string > &  oLabelsToFill 
) const

adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel

Definition at line 723 of file StreamSchedule.cc.

References i, edm::Path::name(), and trig_paths_.

724  {
725  TrigPaths::const_iterator itFound =
726  std::find_if (trig_paths_.begin(),
727  trig_paths_.end(),
728  std::bind(std::equal_to<std::string>(),
729  iPathLabel,
730  std::bind(&Path::name, std::placeholders::_1)));
731  if (itFound!=trig_paths_.end()) {
732  oLabelsToFill.reserve(itFound->size());
733  for (size_t i = 0; i < itFound->size(); ++i) {
734  oLabelsToFill.push_back(itFound->getWorker(i)->description().moduleLabel());
735  }
736  }
737  }
int i
Definition: DBlmapReader.cc:9
std::string const & name() const
Definition: Path.h:68
unsigned int edm::StreamSchedule::numberOfUnscheduledModules ( ) const
inline

Definition at line 258 of file StreamSchedule.h.

References number_of_unscheduled_modules_.

258  {
260  }
unsigned int number_of_unscheduled_modules_
void edm::StreamSchedule::processOneEventAsync ( WaitingTaskHolder  iTask,
EventPrincipal ep,
EventSetup const &  es 
)

Definition at line 547 of file StreamSchedule.cc.

References actReg_, empty_trig_paths_, finishedPaths(), edm::ServiceRegistry::instance(), edm::make_waiting_task(), eostools::move(), cmsPerfStripChart::operate(), edm::hlt::Pass, edm::ServiceRegistry::presentToken(), resetAll(), results_, edm::WorkerManager::setupOnDemandSystem(), streamContext_, streamID_, total_events_, trig_paths_, and workerManager_.

550  {
551  this->resetAll();
552  for (int empty_trig_path : empty_trig_paths_) {
553  results_->at(empty_trig_path) = HLTPathStatus(hlt::Pass, 0);
554  }
555 
556  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
557 
558  Traits::setStreamContext(streamContext_, ep);
559  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
560 
561  // This call takes care of the unscheduled processing.
563 
564  ++total_events_;
565  auto serviceToken = ServiceRegistry::instance().presentToken();
566  auto pathsDone = make_waiting_task(tbb::task::allocate_root(),
567  [iTask,&ep, &es, this,serviceToken](std::exception_ptr const* iPtr) mutable
568  {
569  ServiceRegistry::Operate operate(serviceToken);
570 
571  std::exception_ptr ptr;
572  if(iPtr) {
573  ptr = *iPtr;
574  }
575  finishedPaths(ptr, std::move(iTask), ep, es);
576  });
577 
578  //The holder guarantees that if the paths finish before the loop ends
579  // that we do not start too soon. It also guarantees that the task will
580  // run under that condition.
581  WaitingTaskHolder taskHolder(pathsDone);
582 
583  for(auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend();
584  it != itEnd; ++ it) {
585  it->processOneOccurrenceAsync(pathsDone,ep, es, streamID_, &streamContext_);
586  }
587  }
std::vector< int > empty_trig_paths_
ServiceToken presentToken() const
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
accept
Definition: HLTenums.h:19
StreamContext streamContext_
def move
Definition: eostools.py:510
static ServiceRegistry & instance()
edm::propagate_const< TrigResPtr > results_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
void finishedPaths(std::exception_ptr, WaitingTaskHolder, EventPrincipal &ep, EventSetup const &es)
void setupOnDemandSystem(EventPrincipal &principal, EventSetup const &es)
template<typename T >
void edm::StreamSchedule::processOneStream ( typename T::MyPrincipal &  principal,
EventSetup const &  eventSetup,
bool  cleaningUpAfterException = false 
)

Definition at line 382 of file StreamSchedule.h.

References actReg_, edm::addContextAndPrintException(), cms::Exception::context(), endpathsAreActive_, edm::WorkerManager::processOneOccurrence(), resetAll(), streamContext_, streamID_, workerManager_, and edm::convertException::wrap().

384  {
385  this->resetAll();
386 
387  T::setStreamContext(streamContext_, ep);
388  StreamScheduleSignalSentry<T> sentry(actReg_.get(), &streamContext_);
389 
390  SendTerminationSignalIfException terminationSentry(actReg_.get(), &streamContext_);
391 
392  // This call takes care of the unscheduled processing.
393  workerManager_.processOneOccurrence<T>(ep, es, streamID_, &streamContext_, &streamContext_, cleaningUpAfterException);
394 
395  try {
396  convertException::wrap([&]() {
397  runTriggerPaths<T>(ep, es, &streamContext_);
398 
399  if (endpathsAreActive_) runEndPaths<T>(ep, es, &streamContext_);
400  });
401  }
402  catch(cms::Exception& ex) {
403  if (ex.context().empty()) {
404  addContextAndPrintException("Calling function StreamSchedule::processOneStream", ex, cleaningUpAfterException);
405  } else {
406  addContextAndPrintException("", ex, cleaningUpAfterException);
407  }
408  throw;
409  }
410  terminationSentry.completedSuccessfully();
411 
412  //If we got here no other exception has happened so we can propogate any Service related exceptions
413  sentry.allowThrow();
414  }
void processOneOccurrence(typename T::MyPrincipal &principal, EventSetup const &eventSetup, StreamID streamID, typename T::Context const *topContext, U const *context, bool cleaningUpAfterException=false)
Definition: WorkerManager.h:91
volatile bool endpathsAreActive_
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
StreamContext streamContext_
std::list< std::string > const & context() const
Definition: Exception.cc:191
auto wrap(F iFunc) -> decltype(iFunc())
long double T
void edm::StreamSchedule::replaceModule ( maker::ModuleHolder iMod,
std::string const &  iLabel 
)

clone the type of module with label iLabel but configure with iPSet.

Definition at line 518 of file StreamSchedule.cc.

References allWorkers(), edm::Worker::beginStream(), newFWLiteAna::found, edm::maker::ModuleHolder::replaceModuleFor(), streamContext_, and streamID_.

519  {
520  Worker* found = nullptr;
521  for (auto const& worker : allWorkers()) {
522  if (worker->description().moduleLabel() == iLabel) {
523  found = worker;
524  break;
525  }
526  }
527  if (nullptr == found) {
528  return;
529  }
530 
531  iMod->replaceModuleFor(found);
532  found->beginStream(streamID_,streamContext_);
533  }
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
StreamContext streamContext_
void edm::StreamSchedule::reportSkipped ( EventPrincipal const &  ep) const
inlineprivate

Definition at line 376 of file StreamSchedule.h.

References edm::EventID::event(), edm::EventPrincipal::id(), and edm::EventID::run().

376  {
377  Service<JobReport> reportSvc;
378  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
379  }
void edm::StreamSchedule::resetAll ( )
private

Definition at line 879 of file StreamSchedule.cc.

References results_, and skippingEvent_.

Referenced by processOneEventAsync(), and processOneStream().

879  {
880  skippingEvent_ = false;
881  results_->reset();
882  }
edm::propagate_const< TrigResPtr > results_
std::atomic< bool > skippingEvent_
void edm::StreamSchedule::resetEarlyDelete ( )
private

Definition at line 890 of file StreamSchedule.cc.

References KineDebug3::count(), earlyDeleteBranchToCount_, earlyDeleteHelpers_, earlyDeleteHelperToBranchIndicies_, and cmsHarvester::index.

Referenced by finishProcessOneEvent(), and initializeEarlyDelete().

890  {
891  //must be sure we have cleared the count first
892  for(auto& count:earlyDeleteBranchToCount_) {
893  count.count = 0;
894  }
895  //now reset based on how many helpers use that branch
897  ++(earlyDeleteBranchToCount_[index].count);
898  }
899  for(auto& helper: earlyDeleteHelpers_) {
900  helper.reset();
901  }
902  }
std::vector< BranchToCount > earlyDeleteBranchToCount_
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
TrigResConstPtr edm::StreamSchedule::results ( ) const
inlineprivate

Definition at line 332 of file StreamSchedule.h.

References edm::get_underlying_safe(), and results_.

Referenced by StreamSchedule().

332 {return get_underlying_safe(results_);}
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_
TrigResPtr& edm::StreamSchedule::results ( )
inlineprivate

Definition at line 333 of file StreamSchedule.h.

References edm::get_underlying_safe(), and results_.

333 {return get_underlying_safe(results_);}
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_
template<typename T >
void edm::StreamSchedule::runEndPaths ( typename T::MyPrincipal const &  ep,
EventSetup const &  es,
typename T::Context const *  context 
)
private

Definition at line 427 of file StreamSchedule.h.

References context(), end_paths_, AlCaHLTBitMon_ParallelJobs::p, and streamID_.

427  {
428  // Note there is no state-checking safety controlling the
429  // activation/deactivation of endpaths.
430  for(auto& p : end_paths_) {
431  p.processOneOccurrence<T>(ep, es, streamID_, context);
432  }
433  }
StreamContext const & context() const
long double T
template<typename T >
bool edm::StreamSchedule::runTriggerPaths ( typename T::MyPrincipal const &  ep,
EventSetup const &  es,
typename T::Context const *  context 
)
private

Definition at line 418 of file StreamSchedule.h.

References context(), AlCaHLTBitMon_ParallelJobs::p, results_, streamID_, and trig_paths_.

418  {
419  for(auto& p : trig_paths_) {
420  p.processOneOccurrence<T>(ep, es, streamID_, context);
421  }
422  return results_->accept();
423  }
edm::propagate_const< TrigResPtr > results_
StreamContext const & context() const
long double T
StreamID edm::StreamSchedule::streamID ( ) const
inline

Definition at line 183 of file StreamSchedule.h.

References streamID_.

183 { return streamID_; }
int edm::StreamSchedule::totalEvents ( ) const
inline

Return the number of events this StreamSchedule has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 219 of file StreamSchedule.h.

References total_events_.

Referenced by getTriggerReport(), and totalEventsFailed().

219  {
220  return total_events_;
221  }
int edm::StreamSchedule::totalEventsFailed ( ) const
inline

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 231 of file StreamSchedule.h.

References totalEvents(), and totalEventsPassed().

Referenced by getTriggerReport().

231  {
232  return totalEvents() - totalEventsPassed();
233  }
int totalEvents() const
int totalEventsPassed() const
int edm::StreamSchedule::totalEventsPassed ( ) const
inline

Return the number of events which have been passed by one or more trigger paths.

Definition at line 225 of file StreamSchedule.h.

References total_passed_.

Referenced by getTriggerReport(), and totalEventsFailed().

225  {
226  return total_passed_;
227  }
void edm::StreamSchedule::triggerPaths ( std::vector< std::string > &  oLabelsToFill) const

adds to oLabelsToFill the labels for all trigger paths in the process this is different from availablePaths because it includes the empty paths so matches the entries in TriggerResults exactly.

Definition at line 713 of file StreamSchedule.cc.

References trig_name_list_.

713  {
714  oLabelsToFill = trig_name_list_;
715  }

Member Data Documentation

std::shared_ptr<ActivityRegistry> edm::StreamSchedule::actReg_
private
std::vector<BranchToCount> edm::StreamSchedule::earlyDeleteBranchToCount_
private

Definition at line 352 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

std::vector<EarlyDeleteHelper> edm::StreamSchedule::earlyDeleteHelpers_
private

Definition at line 362 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

std::vector<unsigned int> edm::StreamSchedule::earlyDeleteHelperToBranchIndicies_
private

Definition at line 359 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

vstring edm::StreamSchedule::empty_trig_path_names_
private

Definition at line 347 of file StreamSchedule.h.

Referenced by fillTrigPath().

std::vector<int> edm::StreamSchedule::empty_trig_paths_
private

Definition at line 346 of file StreamSchedule.h.

Referenced by fillTrigPath(), and processOneEventAsync().

vstring edm::StreamSchedule::end_path_name_list_
private

Definition at line 339 of file StreamSchedule.h.

Referenced by endPaths(), fillWorkers(), and StreamSchedule().

TrigPaths edm::StreamSchedule::end_paths_
private
volatile bool edm::StreamSchedule::endpathsAreActive_
private

Definition at line 370 of file StreamSchedule.h.

Referenced by enableEndPaths(), endPathsEnabled(), finishedPaths(), and processOneStream().

unsigned int edm::StreamSchedule::number_of_unscheduled_modules_
private

Definition at line 366 of file StreamSchedule.h.

Referenced by numberOfUnscheduledModules(), and StreamSchedule().

edm::propagate_const<TrigResPtr> edm::StreamSchedule::results_
private
edm::propagate_const<WorkerPtr> edm::StreamSchedule::results_inserter_
private

Definition at line 343 of file StreamSchedule.h.

Referenced by finishedPaths(), and StreamSchedule().

std::atomic<bool> edm::StreamSchedule::skippingEvent_
private

Definition at line 371 of file StreamSchedule.h.

Referenced by fillTrigPath(), and resetAll().

StreamContext edm::StreamSchedule::streamContext_
private
StreamID edm::StreamSchedule::streamID_
private
int edm::StreamSchedule::total_events_
private

Definition at line 364 of file StreamSchedule.h.

Referenced by clearCounters(), processOneEventAsync(), and totalEvents().

int edm::StreamSchedule::total_passed_
private

Definition at line 365 of file StreamSchedule.h.

Referenced by clearCounters(), finishedPaths(), and totalEventsPassed().

vstring edm::StreamSchedule::trig_name_list_
private

Definition at line 338 of file StreamSchedule.h.

Referenced by StreamSchedule(), and triggerPaths().

TrigPaths edm::StreamSchedule::trig_paths_
private
WorkerManager edm::StreamSchedule::workerManager_
private