CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Private Member Functions | Private Attributes
edm::StreamSchedule Class Reference

#include <StreamSchedule.h>

Classes

class  SendTerminationSignalIfException
 

Public Types

typedef std::vector< Worker * > AllWorkers
 
typedef std::vector< PathNonTrigPaths
 
typedef std::vector< WorkerInPathPathWorkers
 
typedef std::vector< PathTrigPaths
 
typedef std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
 
typedef std::shared_ptr< HLTGlobalStatusTrigResPtr
 
typedef std::vector< std::string > vstring
 
typedef std::shared_ptr< WorkerWorkerPtr
 
typedef std::vector< Worker * > Workers
 

Public Member Functions

AllWorkers const & allWorkers () const
 returns the collection of pointers to workers More...
 
void availablePaths (std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill the labels for all paths in the process More...
 
void beginStream ()
 
void clearCounters ()
 Clear all the counters in the trigger report. More...
 
StreamContext const & context () const
 
void enableEndPaths (bool active)
 
void endPaths (std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill the labels for all end paths in the process More...
 
bool endPathsEnabled () const
 
void endStream ()
 
std::vector< ModuleDescription const * > getAllModuleDescriptions () const
 
void getTriggerReport (TriggerReport &rep) const
 
void moduleDescriptionsInEndPath (std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
 
void moduleDescriptionsInPath (std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
 
void modulesInPath (std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
 adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel More...
 
unsigned int numberOfUnscheduledModules () const
 
void processOneEventAsync (WaitingTaskHolder iTask, EventPrincipal &ep, EventSetup const &es)
 
template<typename T >
void processOneStream (typename T::MyPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
 
template<typename T >
void processOneStreamAsync (WaitingTaskHolder iTask, typename T::MyPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
 
void replaceModule (maker::ModuleHolder *iMod, std::string const &iLabel)
 clone the type of module with label iLabel but configure with iPSet. More...
 
StreamID streamID () const
 
 StreamSchedule (std::shared_ptr< TriggerResultInserter > inserter, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
 
 StreamSchedule (StreamSchedule const &)=delete
 
int totalEvents () const
 
int totalEventsFailed () const
 
int totalEventsPassed () const
 
void triggerPaths (std::vector< std::string > &oLabelsToFill) const
 

Private Member Functions

ExceptionToActionTable const & actionTable () const
 returns the action table More...
 
void addToAllWorkers (Worker *w)
 
void fillEndPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name)
 
void fillTrigPath (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, vstring *labelsOnTriggerPaths)
 
void fillWorkers (ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, vstring *labelsOnPaths)
 
void finishedPaths (std::exception_ptr, WaitingTaskHolder, EventPrincipal &ep, EventSetup const &es)
 
std::exception_ptr finishProcessOneEvent (std::exception_ptr)
 
void initializeEarlyDelete (ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
 
void reportSkipped (EventPrincipal const &ep) const
 
void resetAll ()
 
void resetEarlyDelete ()
 
TrigResConstPtr results () const
 
TrigResPtrresults ()
 
template<typename T >
void runEndPaths (typename T::MyPrincipal const &, EventSetup const &, typename T::Context const *)
 
template<typename T >
bool runTriggerPaths (typename T::MyPrincipal const &, EventSetup const &, typename T::Context const *)
 

Private Attributes

std::shared_ptr< ActivityRegistryactReg_
 
std::vector< BranchToCountearlyDeleteBranchToCount_
 
std::vector< EarlyDeleteHelperearlyDeleteHelpers_
 
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
 
vstring empty_trig_path_names_
 
std::vector< int > empty_trig_paths_
 
vstring end_path_name_list_
 
TrigPaths end_paths_
 
volatile bool endpathsAreActive_
 
unsigned int number_of_unscheduled_modules_
 
edm::propagate_const< TrigResPtrresults_
 
edm::propagate_const< WorkerPtrresults_inserter_
 
std::atomic< bool > skippingEvent_
 
StreamContext streamContext_
 
StreamID streamID_
 
int total_events_
 
int total_passed_
 
vstring trig_name_list_
 
TrigPaths trig_paths_
 
WorkerManager workerManager_
 

Detailed Description

Definition at line 143 of file StreamSchedule.h.

Member Typedef Documentation

typedef std::vector<Worker*> edm::StreamSchedule::AllWorkers

Definition at line 151 of file StreamSchedule.h.

Definition at line 147 of file StreamSchedule.h.

Definition at line 155 of file StreamSchedule.h.

typedef std::vector<Path> edm::StreamSchedule::TrigPaths

Definition at line 146 of file StreamSchedule.h.

typedef std::shared_ptr<HLTGlobalStatus const> edm::StreamSchedule::TrigResConstPtr

Definition at line 149 of file StreamSchedule.h.

Definition at line 148 of file StreamSchedule.h.

typedef std::vector<std::string> edm::StreamSchedule::vstring

Definition at line 145 of file StreamSchedule.h.

typedef std::shared_ptr<Worker> edm::StreamSchedule::WorkerPtr

Definition at line 150 of file StreamSchedule.h.

typedef std::vector<Worker*> edm::StreamSchedule::Workers

Definition at line 153 of file StreamSchedule.h.

Constructor & Destructor Documentation

edm::StreamSchedule::StreamSchedule ( std::shared_ptr< TriggerResultInserter inserter,
std::shared_ptr< ModuleRegistry modReg,
ParameterSet proc_pset,
service::TriggerNamesService tns,
PreallocationConfiguration const &  prealloc,
ProductRegistry pregistry,
BranchIDListHelper branchIDListHelper,
ExceptionToActionTable const &  actions,
std::shared_ptr< ActivityRegistry areg,
std::shared_ptr< ProcessConfiguration processConfiguration,
bool  allowEarlyDelete,
StreamID  streamID,
ProcessContext const *  processContext 
)

Definition at line 133 of file StreamSchedule.cc.

References actReg_, addToAllWorkers(), edm::WorkerManager::addToUnscheduledWorkers(), allWorkers(), end_path_name_list_, end_paths_, fillEndPath(), fillTrigPath(), edm::propagate_const< T >::get(), edm::ParameterSet::getParameter(), edm::ParameterSet::getPSetForUpdate(), edm::ParameterSet::getUntrackedParameterSet(), initializeEarlyDelete(), diffTwoXMLs::label, number_of_unscheduled_modules_, geometryDiff::opts, results(), results_inserter_, edm::WorkerManager::setOnDemandProducts(), trig_name_list_, trig_paths_, edm::StreamID::value(), and workerManager_.

145  :
146  workerManager_(modReg,areg, actions),
147  actReg_(areg),
148  trig_name_list_(tns.getTrigPaths()),
149  end_path_name_list_(tns.getEndPaths()),
150  results_(new HLTGlobalStatus(trig_name_list_.size())),
152  trig_paths_(),
153  end_paths_(),
154  total_events_(),
155  total_passed_(),
158  streamContext_(streamID_, processContext),
159  endpathsAreActive_(true),
160  skippingEvent_(false){
161 
162  ParameterSet const& opts = proc_pset.getUntrackedParameterSet("options", ParameterSet());
163  bool hasPath = false;
164 
165  int trig_bitpos = 0;
166  trig_paths_.reserve(trig_name_list_.size());
167  vstring labelsOnTriggerPaths;
168  for (auto const& trig_name : trig_name_list_) {
169  fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name, results(), &labelsOnTriggerPaths);
170  ++trig_bitpos;
171  hasPath = true;
172  }
173 
174  if (hasPath) {
175  // the results inserter stands alone
176  inserter->setTrigResultForStream(streamID.value(), results());
177 
178  results_inserter_ = makeInserter(actions, actReg_, inserter);
180  }
181 
182  // fill normal endpaths
183  int bitpos = 0;
184  end_paths_.reserve(end_path_name_list_.size());
185  for (auto const& end_path_name : end_path_name_list_) {
186  fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name);
187  ++bitpos;
188  }
189 
190  //See if all modules were used
191  std::set<std::string> usedWorkerLabels;
192  for (auto const& worker : allWorkers()) {
193  usedWorkerLabels.insert(worker->description().moduleLabel());
194  }
195  std::vector<std::string> modulesInConfig(proc_pset.getParameter<std::vector<std::string> >("@all_modules"));
196  std::set<std::string> modulesInConfigSet(modulesInConfig.begin(), modulesInConfig.end());
197  std::vector<std::string> unusedLabels;
198  set_difference(modulesInConfigSet.begin(), modulesInConfigSet.end(),
199  usedWorkerLabels.begin(), usedWorkerLabels.end(),
200  back_inserter(unusedLabels));
201  std::set<std::string> unscheduledLabels;
202  std::vector<std::string> shouldBeUsedLabels;
203  if (!unusedLabels.empty()) {
204  //Need to
205  // 1) create worker
206  // 2) if it is a WorkerT<EDProducer>, add it to our list
207  // 3) hand list to our delayed reader
208  for (auto const& label : unusedLabels) {
209  bool isTracked;
210  ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
211  assert(isTracked);
212  assert(modulePSet != nullptr);
213  workerManager_.addToUnscheduledWorkers(*modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
214  }
215  if (!shouldBeUsedLabels.empty()) {
216  std::ostringstream unusedStream;
217  unusedStream << "'" << shouldBeUsedLabels.front() << "'";
218  for (std::vector<std::string>::iterator itLabel = shouldBeUsedLabels.begin() + 1,
219  itLabelEnd = shouldBeUsedLabels.end();
220  itLabel != itLabelEnd;
221  ++itLabel) {
222  unusedStream << ",'" << *itLabel << "'";
223  }
224  LogInfo("path")
225  << "The following module labels are not assigned to any path:\n"
226  << unusedStream.str()
227  << "\n";
228  }
229  }
230  if (!unscheduledLabels.empty()) {
231  number_of_unscheduled_modules_=unscheduledLabels.size();
232  workerManager_.setOnDemandProducts(preg, unscheduledLabels);
233  }
234 
235 
236  initializeEarlyDelete(*modReg, opts,preg,allowEarlyDelete);
237 
238  } // StreamSchedule::StreamSchedule
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
roAction_t actions[nactions]
Definition: GenABIO.cc:187
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name)
vector< string > vstring
Definition: ExoticaDQM.cc:8
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
void setOnDemandProducts(ProductRegistry &pregistry, std::set< std::string > const &unscheduledLabels) const
void addToUnscheduledWorkers(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration > processConfiguration, std::string label, std::set< std::string > &unscheduledLabels, std::vector< std::string > &shouldBeUsedLabels)
volatile bool endpathsAreActive_
void addToAllWorkers(Worker *w)
edm::propagate_const< WorkerPtr > results_inserter_
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
StreamID streamID() const
StreamContext streamContext_
element_type const * get() const
unsigned int value() const
Definition: StreamID.h:46
edm::propagate_const< TrigResPtr > results_
std::atomic< bool > skippingEvent_
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, vstring *labelsOnTriggerPaths)
TrigResConstPtr results() const
edm::StreamSchedule::StreamSchedule ( StreamSchedule const &  )
delete

Member Function Documentation

ExceptionToActionTable const& edm::StreamSchedule::actionTable ( ) const
inlineprivate

returns the action table

Definition at line 295 of file StreamSchedule.h.

References dataset::name, geometryDiff::opts, MillePedeFileConverter_cfg::out, AlCaHLTBitMon_QueryRunRegistry::string, and w.

Referenced by fillEndPath(), fillTrigPath(), and finishedPaths().

295  {
296  return workerManager_.actionTable();
297  }
WorkerManager workerManager_
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:79
void edm::StreamSchedule::addToAllWorkers ( Worker w)
private

Definition at line 878 of file StreamSchedule.cc.

References edm::WorkerManager::addToAllWorkers(), and workerManager_.

Referenced by fillEndPath(), fillTrigPath(), and StreamSchedule().

878  {
880  }
const double w
Definition: UKUtility.cc:23
WorkerManager workerManager_
void addToAllWorkers(Worker *w)
AllWorkers const& edm::StreamSchedule::allWorkers ( ) const
inline

returns the collection of pointers to workers

Definition at line 262 of file StreamSchedule.h.

Referenced by clearCounters(), getAllModuleDescriptions(), getTriggerReport(), initializeEarlyDelete(), replaceModule(), and StreamSchedule().

262  {
263  return workerManager_.allWorkers();
264  }
WorkerManager workerManager_
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:75
void edm::StreamSchedule::availablePaths ( std::vector< std::string > &  oLabelsToFill) const

adds to oLabelsToFill the labels for all paths in the process

Definition at line 698 of file StreamSchedule.cc.

References edm::Path::name(), create_public_lumi_plots::transform, and trig_paths_.

698  {
699  oLabelsToFill.reserve(trig_paths_.size());
700  std::transform(trig_paths_.begin(),
701  trig_paths_.end(),
702  std::back_inserter(oLabelsToFill),
703  std::bind(&Path::name, std::placeholders::_1));
704  }
std::string const & name() const
Definition: Path.h:75
void edm::StreamSchedule::beginStream ( )

Definition at line 503 of file StreamSchedule.cc.

References edm::WorkerManager::beginStream(), streamContext_, streamID_, and workerManager_.

503  {
505  }
WorkerManager workerManager_
StreamContext streamContext_
void beginStream(StreamID iID, StreamContext &streamContext)
void edm::StreamSchedule::clearCounters ( )

Clear all the counters in the trigger report.

Definition at line 863 of file StreamSchedule.cc.

References allWorkers(), edm::Path::clearCounters(), edm::Worker::clearCounters(), end_paths_, edm::for_all(), total_events_, total_passed_, and trig_paths_.

863  {
864  using std::placeholders::_1;
866  for_all(trig_paths_, std::bind(&Path::clearCounters, _1));
867  for_all(end_paths_, std::bind(&Path::clearCounters, _1));
868  for_all(allWorkers(), std::bind(&Worker::clearCounters, _1));
869  }
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void clearCounters()
Definition: Worker.h:154
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void clearCounters()
Definition: Path.cc:164
StreamContext const& edm::StreamSchedule::context ( ) const
inline

Definition at line 270 of file StreamSchedule.h.

270 { return streamContext_;}
StreamContext streamContext_
void edm::StreamSchedule::enableEndPaths ( bool  active)

Turn end_paths "off" if "active" is false; turn end_paths "on" if "active" is true.

Definition at line 792 of file StreamSchedule.cc.

References endpathsAreActive_.

792  {
793  endpathsAreActive_ = active;
794  }
volatile bool endpathsAreActive_
void edm::StreamSchedule::endPaths ( std::vector< std::string > &  oLabelsToFill) const

adds to oLabelsToFill the labels for all end paths in the process

Definition at line 712 of file StreamSchedule.cc.

References end_path_name_list_.

712  {
713  oLabelsToFill = end_path_name_list_;
714  }
bool edm::StreamSchedule::endPathsEnabled ( ) const

Return true if end_paths are active, and false if they are inactive.

Definition at line 797 of file StreamSchedule.cc.

References endpathsAreActive_.

797  {
798  return endpathsAreActive_;
799  }
volatile bool endpathsAreActive_
void edm::StreamSchedule::endStream ( )

Definition at line 507 of file StreamSchedule.cc.

References edm::WorkerManager::endStream(), streamContext_, streamID_, and workerManager_.

507  {
509  }
void endStream(StreamID iID, StreamContext &streamContext)
WorkerManager workerManager_
StreamContext streamContext_
void edm::StreamSchedule::fillEndPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name 
)
private

Definition at line 482 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), end_paths_, fillWorkers(), edm::for_all(), edm::PathContext::kEndPath, and streamContext_.

Referenced by StreamSchedule().

486  {
487  using std::placeholders::_1;
488  PathWorkers tmpworkers;
489  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, 0);
490  Workers holder;
491 
492  for (PathWorkers::iterator wi(tmpworkers.begin()), we(tmpworkers.end()); wi != we; ++wi) {
493  holder.push_back(wi->getWorker());
494  }
495 
496  if (!tmpworkers.empty()) {
497  //EndPaths are not supposed to stop if SkipEvent type exception happens
498  end_paths_.emplace_back(bitpos, name, tmpworkers, TrigResPtr(), actionTable(), actReg_, &streamContext_, nullptr, PathContext::PathType::kEndPath);
499  }
500  for_all(holder, std::bind(&StreamSchedule::addToAllWorkers, this, _1));
501  }
std::vector< Worker * > Workers
std::shared_ptr< HLTGlobalStatus > TrigResPtr
void addToAllWorkers(Worker *w)
ExceptionToActionTable const & actionTable() const
returns the action table
std::shared_ptr< ActivityRegistry > actReg_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
std::vector< WorkerInPath > PathWorkers
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, vstring *labelsOnPaths)
StreamContext streamContext_
void edm::StreamSchedule::fillTrigPath ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
int  bitpos,
std::string const &  name,
TrigResPtr  trptr,
vstring labelsOnTriggerPaths 
)
private

Definition at line 456 of file StreamSchedule.cc.

References actionTable(), actReg_, addToAllWorkers(), empty_trig_path_names_, empty_trig_paths_, fillWorkers(), edm::for_all(), edm::PathContext::kPath, skippingEvent_, streamContext_, and trig_paths_.

Referenced by StreamSchedule().

461  {
462  using std::placeholders::_1;
463  PathWorkers tmpworkers;
464  Workers holder;
465  fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, labelsOnTriggerPaths);
466 
467  for (PathWorkers::iterator wi(tmpworkers.begin()),
468  we(tmpworkers.end()); wi != we; ++wi) {
469  holder.push_back(wi->getWorker());
470  }
471 
472  // an empty path will cause an extra bit that is not used
473  if (!tmpworkers.empty()) {
474  trig_paths_.emplace_back(bitpos, name, tmpworkers, trptr, actionTable(), actReg_, &streamContext_, &skippingEvent_, PathContext::PathType::kPath);
475  } else {
476  empty_trig_paths_.push_back(bitpos);
477  empty_trig_path_names_.push_back(name);
478  }
479  for_all(holder, std::bind(&StreamSchedule::addToAllWorkers, this, _1));
480  }
std::vector< Worker * > Workers
std::vector< int > empty_trig_paths_
void addToAllWorkers(Worker *w)
ExceptionToActionTable const & actionTable() const
returns the action table
std::shared_ptr< ActivityRegistry > actReg_
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
std::vector< WorkerInPath > PathWorkers
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, vstring *labelsOnPaths)
StreamContext streamContext_
vstring empty_trig_path_names_
std::atomic< bool > skippingEvent_
void edm::StreamSchedule::fillWorkers ( ParameterSet proc_pset,
ProductRegistry preg,
PreallocationConfiguration const *  prealloc,
std::shared_ptr< ProcessConfiguration const >  processConfiguration,
std::string const &  name,
bool  ignoreFilters,
PathWorkers out,
vstring labelsOnPaths 
)
private

Definition at line 397 of file StreamSchedule.cc.

References edm::errors::Configuration, edm::Worker::description(), end_path_name_list_, Exception, edm::ParameterSet::getParameter(), edm::ParameterSet::getPSetForUpdate(), edm::ParameterSet::getUntrackedParameter(), edm::WorkerManager::getWorker(), edm::WorkerInPath::Ignore, edm::Worker::kFilter, edm::ModuleDescription::moduleName(), edm::Worker::moduleType(), dataset::name, edm::WorkerInPath::Normal, edm::search_all(), AlCaHLTBitMon_QueryRunRegistry::string, edm::WorkerInPath::Veto, and workerManager_.

Referenced by fillEndPath(), and fillTrigPath().

404  {
405  vstring modnames = proc_pset.getParameter<vstring>(pathName);
406  PathWorkers tmpworkers;
407 
408  unsigned int placeInPath = 0;
409  for (auto const& name : modnames) {
410 
411  if (labelsOnPaths) labelsOnPaths->push_back(name);
412 
414  if (name[0] == '!') filterAction = WorkerInPath::Veto;
415  else if (name[0] == '-') filterAction = WorkerInPath::Ignore;
416 
417  std::string moduleLabel = name;
418  if (filterAction != WorkerInPath::Normal) moduleLabel.erase(0, 1);
419 
420  bool isTracked;
421  ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
422  if (modpset == 0) {
423  std::string pathType("endpath");
424  if (!search_all(end_path_name_list_, pathName)) {
425  pathType = std::string("path");
426  }
428  "The unknown module label \"" << moduleLabel <<
429  "\" appears in " << pathType << " \"" << pathName <<
430  "\"\n please check spelling or remove that label from the path.";
431  }
432  assert(isTracked);
433 
434  Worker* worker = workerManager_.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
435  if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType()==Worker::kFilter) {
436  // We have a filter on an end path, and the filter is not explicitly ignored.
437  // See if the filter is allowed.
438  std::vector<std::string> allowed_filters = proc_pset.getUntrackedParameter<vstring>("@filters_on_endpaths");
439  if (!search_all(allowed_filters, worker->description().moduleName())) {
440  // Filter is not allowed. Ignore the result, and issue a warning.
441  filterAction = WorkerInPath::Ignore;
442  LogWarning("FilterOnEndPath")
443  << "The EDFilter '" << worker->description().moduleName() << "' with module label '" << moduleLabel << "' appears on EndPath '" << pathName << "'.\n"
444  << "The return value of the filter will be ignored.\n"
445  << "To suppress this warning, either remove the filter from the endpath,\n"
446  << "or explicitly ignore it in the configuration by using cms.ignore().\n";
447  }
448  }
449  tmpworkers.emplace_back(worker, filterAction, placeInPath);
450  ++placeInPath;
451  }
452 
453  out.swap(tmpworkers);
454  }
vector< string > vstring
Definition: ExoticaDQM.cc:8
WorkerManager workerManager_
std::vector< WorkerInPath > PathWorkers
bool search_all(ForwardSequence const &s, Datum const &d)
Definition: Algorithms.h:46
Worker * getWorker(ParameterSet &pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &label)
void edm::StreamSchedule::finishedPaths ( std::exception_ptr  iExcept,
WaitingTaskHolder  iWait,
EventPrincipal ep,
EventSetup const &  es 
)
private

Definition at line 583 of file StreamSchedule.cc.

References mps_alisetup::action, actionTable(), cms::Exception::addContext(), cms::Exception::category(), cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), MillePedeFileConverter_cfg::e, end_paths_, endpathsAreActive_, edm::exception_actions::FailPath, edm::ExceptionToActionTable::find(), finishProcessOneEvent(), edm::propagate_const< T >::get(), edm::EventPrincipal::id(), edm::exception_actions::IgnoreCompletely, edm::ServiceRegistry::instance(), edm::make_waiting_task(), cmsPerfStripChart::operate(), or, edm::ServiceRegistry::presentToken(), edm::printCmsExceptionWarning(), results_, results_inserter_, edm::exception_actions::SkipEvent, streamContext_, streamID_, and total_passed_.

Referenced by processOneEventAsync().

584  {
585 
586  if(iExcept) {
587  try {
588  std::rethrow_exception(iExcept);
589  }
590  catch(cms::Exception& e) {
592  assert (action != exception_actions::IgnoreCompletely);
593  assert (action != exception_actions::FailPath);
594  if (action == exception_actions::SkipEvent) {
595  edm::printCmsExceptionWarning("SkipEvent", e);
596  iExcept = std::exception_ptr();
597  } else {
598  iExcept = std::current_exception();
599  }
600  }
601  catch(...) {
602  iExcept = std::current_exception();
603  }
604  }
605 
606 
607  if((not iExcept) and results_->accept()) {
608  ++total_passed_;
609  }
610 
611  if((not iExcept) and (nullptr != results_inserter_.get())) {
612  try {
613  ParentContext parentContext(&streamContext_);
614  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
615 
616  results_inserter_->doWork<Traits>(ep, es, streamID_, parentContext, &streamContext_);
617  }
618  catch (cms::Exception & ex) {
619  if(ex.context().empty()) {
620  std::ostringstream ost;
621  ost << "Processing Event " << ep.id();
622  ex.addContext(ost.str());
623  }
624  iExcept = std::current_exception();
625  }
626  catch(...) {
627  iExcept = std::current_exception();
628  }
629  }
630  if(end_paths_.empty() or iExcept or (not endpathsAreActive_)) {
631  iExcept = finishProcessOneEvent(iExcept);
632  iWait.doneWaiting(iExcept);
633  } else {
634  auto serviceToken = ServiceRegistry::instance().presentToken();
635 
636  auto endPathsDone = make_waiting_task(tbb::task::allocate_root(),
637  [iWait,this,serviceToken](std::exception_ptr const* iPtr) mutable
638  {
639  ServiceRegistry::Operate operate(serviceToken);
640 
641  std::exception_ptr ptr;
642  if(iPtr) {
643  ptr = *iPtr;
644  }
645  iWait.doneWaiting(finishProcessOneEvent(ptr));
646  });
647  //The holder guarantees that if the paths finish before the loop ends
648  // that we do not start too soon. It also guarantees that the task will
649  // run under that condition.
650  WaitingTaskHolder taskHolder(endPathsDone);
651  for(auto it = end_paths_.rbegin(), itEnd = end_paths_.rend();
652  it != itEnd; ++it) {
653  it->processOneOccurrenceAsync(endPathsDone,ep, es, streamID_, &streamContext_);
654  }
655  }
656  }
volatile bool endpathsAreActive_
std::string const & category() const
Definition: Exception.cc:183
exception_actions::ActionCodes find(const std::string &category) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
ServiceToken presentToken() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
StreamContext streamContext_
std::list< std::string > const & context() const
Definition: Exception.cc:191
static ServiceRegistry & instance()
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
element_type const * get() const
edm::propagate_const< TrigResPtr > results_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
void addContext(std::string const &context)
Definition: Exception.cc:227
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
def operate(timelog, memlog, json_f, num)
std::exception_ptr edm::StreamSchedule::finishProcessOneEvent ( std::exception_ptr  iExcept)
private

Definition at line 660 of file StreamSchedule.cc.

References actReg_, edm::addContextAndPrintException(), cms::Exception::context(), edm::ExceptionFromThisContext, resetEarlyDelete(), streamContext_, and edm::convertException::wrap().

Referenced by finishedPaths().

660  {
661  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
662 
663  if(iExcept) {
664  //add context information to the exception and print message
665  try {
666  convertException::wrap([&]() {
667  std::rethrow_exception(iExcept);
668  });
669  } catch(cms::Exception& ex) {
670  bool const cleaningUpAfterException = false;
671  if (ex.context().empty()) {
672  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
673  } else {
674  addContextAndPrintException("", ex, cleaningUpAfterException);
675  }
676  iExcept = std::current_exception();
677  }
678 
679  actReg_->preStreamEarlyTerminationSignal_(streamContext_,TerminationOrigin::ExceptionFromThisContext);
680  }
681 
682  try {
683  Traits::postScheduleSignal(actReg_.get(), &streamContext_);
684  } catch(...) {
685  if(not iExcept) {
686  iExcept = std::current_exception();
687  }
688  }
689  if(not iExcept ) {
691  }
692 
693  return iExcept;
694  }
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
std::shared_ptr< ActivityRegistry > actReg_
StreamContext streamContext_
std::list< std::string > const & context() const
Definition: Exception.cc:191
auto wrap(F iFunc) -> decltype(iFunc())
std::vector< ModuleDescription const * > edm::StreamSchedule::getAllModuleDescriptions ( ) const

Return a vector allowing const access to all the ModuleDescriptions for this StreamSchedule. *** N.B. *** Ownership of the ModuleDescriptions is not *** passed to the caller. Do not call delete on these *** pointers!

Definition at line 529 of file StreamSchedule.cc.

References allWorkers(), AlCaHLTBitMon_ParallelJobs::p, mps_fire::result, and findQualityFiles::size.

529  {
530  std::vector<ModuleDescription const*> result;
531  result.reserve(allWorkers().size());
532 
533  for (auto const& worker : allWorkers()) {
534  ModuleDescription const* p = worker->descPtr();
535  result.push_back(p);
536  }
537  return result;
538  }
size
Write out results.
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void edm::StreamSchedule::getTriggerReport ( TriggerReport rep) const

Return the trigger report information on paths, modules-in-path, modules-in-endpath, and modules.

Definition at line 852 of file StreamSchedule.cc.

References allWorkers(), end_paths_, edm::TriggerReport::endPathSummaries, edm::TriggerReport::eventSummary, edm::fillPathSummary(), edm::fillWorkerSummary(), edm::EventSummary::totalEvents, totalEvents(), edm::EventSummary::totalEventsFailed, totalEventsFailed(), edm::EventSummary::totalEventsPassed, totalEventsPassed(), trig_paths_, edm::TriggerReport::trigPathSummaries, and edm::TriggerReport::workerSummaries.

852  {
853  rep.eventSummary.totalEvents += totalEvents();
854  rep.eventSummary.totalEventsPassed += totalEventsPassed();
855  rep.eventSummary.totalEventsFailed += totalEventsFailed();
856 
857  fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathSummary);
858  fill_summary(end_paths_, rep.endPathSummaries, &fillPathSummary);
859  fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
860  }
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
int totalEventsFailed() const
int totalEvents() const
int totalEventsPassed() const
static void fillPathSummary(Path const &path, PathSummary &sum)
rep
Definition: cuy.py:1188
static void fillWorkerSummary(Worker const *pw, WorkerSummary &sum)
void edm::StreamSchedule::initializeEarlyDelete ( ModuleRegistry modReg,
edm::ParameterSet const &  opts,
edm::ProductRegistry const &  preg,
bool  allowEarlyDelete 
)
private

Definition at line 241 of file StreamSchedule.cc.

References allWorkers(), edm::BranchDescription::branchName(), edm::maker::ModuleHolder::createOutputModuleCommunicator(), delta, earlyDeleteBranchToCount_, earlyDeleteHelpers_, earlyDeleteHelperToBranchIndicies_, end_paths_, edm::ModuleRegistry::forAllModuleHolders(), runEdmFileComparison::found, edm::pset::Registry::getMapped(), diffTreeTool::index, edm::InEvent, edm::pset::Registry::instance(), checklumidiff::l, gen::n, AlCaHLTBitMon_ParallelJobs::p, muonDTDigis_cfi::pset, resetEarlyDelete(), AlCaHLTBitMon_QueryRunRegistry::string, trig_paths_, mitigatedMETSequence_cff::U, and w.

Referenced by StreamSchedule().

243  {
244  //for now, if have a subProcess, don't allow early delete
245  // In the future we should use the SubProcess's 'keep list' to decide what can be kept
246  if(not allowEarlyDelete) return;
247 
248  //see if 'canDeleteEarly' was set and if so setup the list with those products actually
249  // registered for this job
250  std::multimap<std::string,Worker*> branchToReadingWorker;
251  initializeBranchToReadingWorker(opts,preg,branchToReadingWorker);
252 
253  //If no delete early items have been specified we don't have to do anything
254  if(branchToReadingWorker.size()==0) {
255  return;
256  }
257  const std::vector<std::string> kEmpty;
258  std::map<Worker*,unsigned int> reserveSizeForWorker;
259  unsigned int upperLimitOnReadingWorker =0;
260  unsigned int upperLimitOnIndicies = 0;
261  unsigned int nUniqueBranchesToDelete=branchToReadingWorker.size();
262 
263  //talk with output modules first
264  modReg.forAllModuleHolders([this, &branchToReadingWorker,&nUniqueBranchesToDelete](maker::ModuleHolder* iHolder){
265  auto comm = iHolder->createOutputModuleCommunicator();
266  if (comm) {
267  if(branchToReadingWorker.size()>0) {
268  //If an OutputModule needs a product, we can't delete it early
269  // so we should remove it from our list
270  SelectedProductsForBranchType const& kept = comm->keptProducts();
271  for(auto const& item: kept[InEvent]) {
272  BranchDescription const& desc = *item.first;
273  auto found = branchToReadingWorker.equal_range(desc.branchName());
274  if(found.first !=found.second) {
275  --nUniqueBranchesToDelete;
276  branchToReadingWorker.erase(found.first,found.second);
277  }
278  }
279  }
280  }
281  });
282 
283  if(branchToReadingWorker.size()==0) {
284  return;
285  }
286 
287  for (auto w :allWorkers()) {
288  //determine if this module could read a branch we want to delete early
289  auto pset = pset::Registry::instance()->getMapped(w->description().parameterSetID());
290  if(0!=pset) {
291  auto branches = pset->getUntrackedParameter<std::vector<std::string>>("mightGet",kEmpty);
292  if(not branches.empty()) {
293  ++upperLimitOnReadingWorker;
294  }
295  for(auto const& branch:branches){
296  auto found = branchToReadingWorker.equal_range(branch);
297  if(found.first != found.second) {
298  ++upperLimitOnIndicies;
299  ++reserveSizeForWorker[w];
300  if(nullptr == found.first->second) {
301  found.first->second = w;
302  } else {
303  branchToReadingWorker.insert(make_pair(found.first->first,w));
304  }
305  }
306  }
307  }
308  }
309  {
310  auto it = branchToReadingWorker.begin();
311  std::vector<std::string> unusedBranches;
312  while(it !=branchToReadingWorker.end()) {
313  if(it->second == nullptr) {
314  unusedBranches.push_back(it->first);
315  //erasing the object invalidates the iterator so must advance it first
316  auto temp = it;
317  ++it;
318  branchToReadingWorker.erase(temp);
319  } else {
320  ++it;
321  }
322  }
323  if(not unusedBranches.empty()) {
324  LogWarning l("UnusedProductsForCanDeleteEarly");
325  l<<"The following products in the 'canDeleteEarly' list are not used in this job and will be ignored.\n"
326  " If possible, remove the producer from the job or add the product to the producer's own 'mightGet' list.";
327  for(auto const& n:unusedBranches){
328  l<<"\n "<<n;
329  }
330  }
331  }
332  if(0!=branchToReadingWorker.size()) {
333  earlyDeleteHelpers_.reserve(upperLimitOnReadingWorker);
334  earlyDeleteHelperToBranchIndicies_.resize(upperLimitOnIndicies,0);
335  earlyDeleteBranchToCount_.reserve(nUniqueBranchesToDelete);
336  std::map<const Worker*,EarlyDeleteHelper*> alreadySeenWorkers;
337  std::string lastBranchName;
338  size_t nextOpenIndex = 0;
339  unsigned int* beginAddress = &(earlyDeleteHelperToBranchIndicies_.front());
340  for(auto& branchAndWorker:branchToReadingWorker) {
341  if(lastBranchName != branchAndWorker.first) {
342  //have to put back the period we removed earlier in order to get the proper name
343  BranchID bid(branchAndWorker.first+".");
344  earlyDeleteBranchToCount_.emplace_back(bid,0U);
345  lastBranchName = branchAndWorker.first;
346  }
347  auto found = alreadySeenWorkers.find(branchAndWorker.second);
348  if(alreadySeenWorkers.end() == found) {
349  //NOTE: we will set aside enough space in earlyDeleteHelperToBranchIndicies_ to accommodate
350  // all the branches that might be read by this worker. However, initially we will only tell the
351  // EarlyDeleteHelper about the first one. As additional branches are added via 'appendIndex' the
352  // EarlyDeleteHelper will automatically advance its internal end pointer.
353  size_t index = nextOpenIndex;
354  size_t nIndices = reserveSizeForWorker[branchAndWorker.second];
356  earlyDeleteHelpers_.emplace_back(beginAddress+index,
357  beginAddress+index+1,
359  branchAndWorker.second->setEarlyDeleteHelper(&(earlyDeleteHelpers_.back()));
360  alreadySeenWorkers.insert(std::make_pair(branchAndWorker.second,&(earlyDeleteHelpers_.back())));
361  nextOpenIndex +=nIndices;
362  } else {
363  found->second->appendIndex(earlyDeleteBranchToCount_.size()-1);
364  }
365  }
366 
367  //Now we can compactify the earlyDeleteHelperToBranchIndicies_ since we may have over estimated the
368  // space needed for each module
369  auto itLast = earlyDeleteHelpers_.begin();
370  for(auto it = earlyDeleteHelpers_.begin()+1;it != earlyDeleteHelpers_.end();++it) {
371  if(itLast->end() != it->begin()) {
372  //figure the offset for next Worker since it hasn't been moved yet so it has the original address
373  unsigned int delta = it->begin()- itLast->end();
374  it->shiftIndexPointers(delta);
375 
377  (itLast->end()-beginAddress),
379  (it->begin()-beginAddress));
380  }
381  itLast = it;
382  }
383  earlyDeleteHelperToBranchIndicies_.erase(earlyDeleteHelperToBranchIndicies_.begin()+(itLast->end()-beginAddress),
385 
386  //now tell the paths about the deleters
387  for(auto& p : trig_paths_) {
388  p.setEarlyDeleteHelpers(alreadySeenWorkers);
389  }
390  for(auto& p : end_paths_) {
391  p.setEarlyDeleteHelpers(alreadySeenWorkers);
392  }
394  }
395  }
dbl * delta
Definition: mlp_gen.cc:36
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
const double w
Definition: UKUtility.cc:23
std::array< SelectedProducts, NumBranchTypes > SelectedProductsForBranchType
std::vector< BranchToCount > earlyDeleteBranchToCount_
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:18
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
static Registry * instance()
Definition: Registry.cc:12
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void edm::StreamSchedule::moduleDescriptionsInEndPath ( std::string const &  iEndPathLabel,
std::vector< ModuleDescription const * > &  descriptions,
unsigned int  hint 
) const

Definition at line 763 of file StreamSchedule.cc.

References end_paths_, runEdmFileComparison::found, mps_fire::i, and edm::Path::name().

765  {
766  descriptions.clear();
767  bool found = false;
768  TrigPaths::const_iterator itFound;
769 
770  if(hint < end_paths_.size()) {
771  itFound = end_paths_.begin() + hint;
772  if(itFound->name() == iEndPathLabel) found = true;
773  }
774  if(!found) {
775  // if the hint did not work, do it the slow way
776  itFound = std::find_if (end_paths_.begin(),
777  end_paths_.end(),
778  std::bind(std::equal_to<std::string>(),
779  iEndPathLabel,
780  std::bind(&Path::name, std::placeholders::_1)));
781  if (itFound != end_paths_.end()) found = true;
782  }
783  if (found) {
784  descriptions.reserve(itFound->size());
785  for (size_t i = 0; i < itFound->size(); ++i) {
786  descriptions.push_back(itFound->getWorker(i)->descPtr());
787  }
788  }
789  }
std::string const & name() const
Definition: Path.h:75
void edm::StreamSchedule::moduleDescriptionsInPath ( std::string const &  iPathLabel,
std::vector< ModuleDescription const * > &  descriptions,
unsigned int  hint 
) const

Definition at line 734 of file StreamSchedule.cc.

References runEdmFileComparison::found, mps_fire::i, edm::Path::name(), and trig_paths_.

736  {
737  descriptions.clear();
738  bool found = false;
739  TrigPaths::const_iterator itFound;
740 
741  if(hint < trig_paths_.size()) {
742  itFound = trig_paths_.begin() + hint;
743  if(itFound->name() == iPathLabel) found = true;
744  }
745  if(!found) {
746  // if the hint did not work, do it the slow way
747  itFound = std::find_if (trig_paths_.begin(),
748  trig_paths_.end(),
749  std::bind(std::equal_to<std::string>(),
750  iPathLabel,
751  std::bind(&Path::name, std::placeholders::_1)));
752  if (itFound != trig_paths_.end()) found = true;
753  }
754  if (found) {
755  descriptions.reserve(itFound->size());
756  for (size_t i = 0; i < itFound->size(); ++i) {
757  descriptions.push_back(itFound->getWorker(i)->descPtr());
758  }
759  }
760  }
std::string const & name() const
Definition: Path.h:75
void edm::StreamSchedule::modulesInPath ( std::string const &  iPathLabel,
std::vector< std::string > &  oLabelsToFill 
) const

adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel

Definition at line 717 of file StreamSchedule.cc.

References mps_fire::i, edm::Path::name(), and trig_paths_.

718  {
719  TrigPaths::const_iterator itFound =
720  std::find_if (trig_paths_.begin(),
721  trig_paths_.end(),
722  std::bind(std::equal_to<std::string>(),
723  iPathLabel,
724  std::bind(&Path::name, std::placeholders::_1)));
725  if (itFound!=trig_paths_.end()) {
726  oLabelsToFill.reserve(itFound->size());
727  for (size_t i = 0; i < itFound->size(); ++i) {
728  oLabelsToFill.push_back(itFound->getWorker(i)->description().moduleLabel());
729  }
730  }
731  }
std::string const & name() const
Definition: Path.h:75
unsigned int edm::StreamSchedule::numberOfUnscheduledModules ( ) const
inline

Definition at line 266 of file StreamSchedule.h.

266  {
268  }
unsigned int number_of_unscheduled_modules_
void edm::StreamSchedule::processOneEventAsync ( WaitingTaskHolder  iTask,
EventPrincipal ep,
EventSetup const &  es 
)

Definition at line 540 of file StreamSchedule.cc.

References actReg_, empty_trig_paths_, finishedPaths(), edm::ServiceRegistry::instance(), edm::make_waiting_task(), eostools::move(), cmsPerfStripChart::operate(), edm::hlt::Pass, edm::ServiceRegistry::presentToken(), resetAll(), results_, edm::WorkerManager::setupOnDemandSystem(), streamContext_, streamID_, total_events_, trig_paths_, and workerManager_.

543  {
544  this->resetAll();
545  for (int empty_trig_path : empty_trig_paths_) {
546  results_->at(empty_trig_path) = HLTPathStatus(hlt::Pass, 0);
547  }
548 
549  using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;
550 
551  Traits::setStreamContext(streamContext_, ep);
552  Traits::preScheduleSignal(actReg_.get(), &streamContext_);
553 
554  // This call takes care of the unscheduled processing.
556 
557  ++total_events_;
558  auto serviceToken = ServiceRegistry::instance().presentToken();
559  auto pathsDone = make_waiting_task(tbb::task::allocate_root(),
560  [iTask,&ep, &es, this,serviceToken](std::exception_ptr const* iPtr) mutable
561  {
562  ServiceRegistry::Operate operate(serviceToken);
563 
564  std::exception_ptr ptr;
565  if(iPtr) {
566  ptr = *iPtr;
567  }
568  finishedPaths(ptr, std::move(iTask), ep, es);
569  });
570 
571  //The holder guarantees that if the paths finish before the loop ends
572  // that we do not start too soon. It also guarantees that the task will
573  // run under that condition.
574  WaitingTaskHolder taskHolder(pathsDone);
575 
576  for(auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend();
577  it != itEnd; ++ it) {
578  it->processOneOccurrenceAsync(pathsDone,ep, es, streamID_, &streamContext_);
579  }
580  }
std::vector< int > empty_trig_paths_
ServiceToken presentToken() const
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
accept
Definition: HLTenums.h:19
StreamContext streamContext_
static ServiceRegistry & instance()
edm::propagate_const< TrigResPtr > results_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
void finishedPaths(std::exception_ptr, WaitingTaskHolder, EventPrincipal &ep, EventSetup const &es)
def move(src, dest)
Definition: eostools.py:510
void setupOnDemandSystem(EventPrincipal &principal, EventSetup const &es)
def operate(timelog, memlog, json_f, num)
template<typename T >
void edm::StreamSchedule::processOneStream ( typename T::MyPrincipal &  principal,
EventSetup const &  eventSetup,
bool  cleaningUpAfterException = false 
)

Definition at line 390 of file StreamSchedule.h.

References edm::addContextAndPrintException(), cms::Exception::context(), and edm::convertException::wrap().

392  {
393  this->resetAll();
394 
395  T::setStreamContext(streamContext_, ep);
396  StreamScheduleSignalSentry<T> sentry(actReg_.get(), &streamContext_);
397 
398  SendTerminationSignalIfException terminationSentry(actReg_.get(), &streamContext_);
399 
400  // This call takes care of the unscheduled processing.
401  workerManager_.processOneOccurrence<T>(ep, es, streamID_, &streamContext_, &streamContext_, cleaningUpAfterException);
402 
403  try {
404  convertException::wrap([&]() {
405  runTriggerPaths<T>(ep, es, &streamContext_);
406 
407  if (endpathsAreActive_) runEndPaths<T>(ep, es, &streamContext_);
408  });
409  }
410  catch(cms::Exception& ex) {
411  if (ex.context().empty()) {
412  addContextAndPrintException("Calling function StreamSchedule::processOneStream", ex, cleaningUpAfterException);
413  } else {
414  addContextAndPrintException("", ex, cleaningUpAfterException);
415  }
416  throw;
417  }
418  terminationSentry.completedSuccessfully();
419 
420  //If we got here no other exception has happened so we can propogate any Service related exceptions
421  sentry.allowThrow();
422  }
void processOneOccurrence(typename T::MyPrincipal &principal, EventSetup const &eventSetup, StreamID streamID, typename T::Context const *topContext, U const *context, bool cleaningUpAfterException=false)
volatile bool endpathsAreActive_
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
StreamContext streamContext_
std::list< std::string > const & context() const
Definition: Exception.cc:191
auto wrap(F iFunc) -> decltype(iFunc())
long double T
template<typename T >
void edm::StreamSchedule::processOneStreamAsync ( WaitingTaskHolder  iTask,
typename T::MyPrincipal &  principal,
EventSetup const &  eventSetup,
bool  cleaningUpAfterException = false 
)

Definition at line 425 of file StreamSchedule.h.

References edm::addContextAndPrintException(), cms::Exception::context(), edm::WaitingTaskHolder::doneWaiting(), edm::ExceptionFromThisContext, h, hcalTTPDigis_cfi::id, edm::ServiceRegistry::instance(), edm::make_functor_task(), edm::make_waiting_task(), AlCaHLTBitMon_ParallelJobs::p, edm::ServiceRegistry::presentToken(), and edm::convertException::wrap().

428  {
430 
431  T::setStreamContext(streamContext_, ep);
432 
433  auto id = ep.id();
434  auto doneTask = make_waiting_task(tbb::task::allocate_root(),
435  [this,iHolder, id,cleaningUpAfterException,token](std::exception_ptr const* iPtr) mutable
436  {
437  ServiceRegistry::Operate op(token);
438  std::exception_ptr excpt;
439  if(iPtr) {
440  excpt = *iPtr;
441  //add context information to the exception and print message
442  try {
443  convertException::wrap([&]() {
444  std::rethrow_exception(excpt);
445  });
446  } catch(cms::Exception& ex) {
447  //TODO: should add the transition type info
448  std::ostringstream ost;
449  if(ex.context().empty()) {
450  ost<<"Processing "<<T::transitionName()<<" "<<id;
451  }
452  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
453  excpt = std::current_exception();
454  }
455 
456  actReg_->preStreamEarlyTerminationSignal_(streamContext_,TerminationOrigin::ExceptionFromThisContext);
457  }
458 
459  try {
460  T::postScheduleSignal(actReg_.get(), &streamContext_);
461  } catch(...) {
462  if(not excpt) {
463  excpt = std::current_exception();
464  }
465  }
466  iHolder.doneWaiting(excpt);
467 
468  });
469 
470  auto task = make_functor_task(tbb::task::allocate_root(), [this,doneTask,&ep,&es,cleaningUpAfterException,token] () mutable {
471  ServiceRegistry::Operate op(token);
472  T::preScheduleSignal(actReg_.get(), &streamContext_);
473  WaitingTaskHolder h(doneTask);
474 
476  for(auto& p : end_paths_) {
477  p.runAllModulesAsync<T>(doneTask, ep, es, streamID_, &streamContext_);
478  }
479 
480  for(auto& p : trig_paths_) {
481  p.runAllModulesAsync<T>(doneTask, ep, es, streamID_, &streamContext_);
482  }
483 
486  });
487 
488  if(streamID_.value() == 0) {
489  //Enqueueing will start another thread if there is only
490  // one thread in the job. Having stream == 0 use spawn
491  // avoids starting up another thread when there is only one stream.
492  tbb::task::spawn( *task);
493  } else {
494  tbb::task::enqueue( *task);
495  }
496  }
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
ServiceToken presentToken() const
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
void processOneOccurrenceAsync(WaitingTask *task, typename T::MyPrincipal &principal, EventSetup const &eventSetup, StreamID streamID, typename T::Context const *topContext, U const *context)
StreamContext streamContext_
std::list< std::string > const & context() const
Definition: Exception.cc:191
static ServiceRegistry & instance()
unsigned int value() const
Definition: StreamID.h:46
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
auto wrap(F iFunc) -> decltype(iFunc())
long double T
void edm::StreamSchedule::replaceModule ( maker::ModuleHolder iMod,
std::string const &  iLabel 
)

clone the type of module with label iLabel but configure with iPSet.

Definition at line 511 of file StreamSchedule.cc.

References allWorkers(), edm::Worker::beginStream(), runEdmFileComparison::found, edm::maker::ModuleHolder::replaceModuleFor(), streamContext_, and streamID_.

512  {
513  Worker* found = nullptr;
514  for (auto const& worker : allWorkers()) {
515  if (worker->description().moduleLabel() == iLabel) {
516  found = worker;
517  break;
518  }
519  }
520  if (nullptr == found) {
521  return;
522  }
523 
524  iMod->replaceModuleFor(found);
525  found->beginStream(streamID_,streamContext_);
526  }
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
StreamContext streamContext_
void edm::StreamSchedule::reportSkipped ( EventPrincipal const &  ep) const
inlineprivate

Definition at line 384 of file StreamSchedule.h.

References edm::EventID::event(), edm::EventPrincipal::id(), and edm::EventID::run().

384  {
385  Service<JobReport> reportSvc;
386  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
387  }
void edm::StreamSchedule::resetAll ( )
private

Definition at line 872 of file StreamSchedule.cc.

References results_, and skippingEvent_.

Referenced by processOneEventAsync().

872  {
873  skippingEvent_ = false;
874  results_->reset();
875  }
edm::propagate_const< TrigResPtr > results_
std::atomic< bool > skippingEvent_
void edm::StreamSchedule::resetEarlyDelete ( )
private

Definition at line 883 of file StreamSchedule.cc.

References KineDebug3::count(), earlyDeleteBranchToCount_, earlyDeleteHelpers_, earlyDeleteHelperToBranchIndicies_, and diffTreeTool::index.

Referenced by finishProcessOneEvent(), and initializeEarlyDelete().

883  {
884  //must be sure we have cleared the count first
885  for(auto& count:earlyDeleteBranchToCount_) {
886  count.count = 0;
887  }
888  //now reset based on how many helpers use that branch
890  ++(earlyDeleteBranchToCount_[index].count);
891  }
892  for(auto& helper: earlyDeleteHelpers_) {
893  helper.reset();
894  }
895  }
Definition: helper.py:1
std::vector< BranchToCount > earlyDeleteBranchToCount_
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
TrigResConstPtr edm::StreamSchedule::results ( ) const
inlineprivate

Definition at line 340 of file StreamSchedule.h.

References edm::get_underlying_safe().

Referenced by StreamSchedule().

340 {return get_underlying_safe(results_);}
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_
TrigResPtr& edm::StreamSchedule::results ( )
inlineprivate

Definition at line 341 of file StreamSchedule.h.

References edm::get_underlying_safe().

341 {return get_underlying_safe(results_);}
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
edm::propagate_const< TrigResPtr > results_
template<typename T >
void edm::StreamSchedule::runEndPaths ( typename T::MyPrincipal const &  ep,
EventSetup const &  es,
typename T::Context const *  context 
)
private

Definition at line 510 of file StreamSchedule.h.

References AlCaHLTBitMon_ParallelJobs::p.

510  {
511  // Note there is no state-checking safety controlling the
512  // activation/deactivation of endpaths.
513  for(auto& p : end_paths_) {
514  p.processOneOccurrence<T>(ep, es, streamID_, context);
515  }
516  }
StreamContext const & context() const
long double T
template<typename T >
bool edm::StreamSchedule::runTriggerPaths ( typename T::MyPrincipal const &  ep,
EventSetup const &  es,
typename T::Context const *  context 
)
private

Definition at line 501 of file StreamSchedule.h.

References AlCaHLTBitMon_ParallelJobs::p.

501  {
502  for(auto& p : trig_paths_) {
503  p.processOneOccurrence<T>(ep, es, streamID_, context);
504  }
505  return results_->accept();
506  }
edm::propagate_const< TrigResPtr > results_
StreamContext const & context() const
long double T
StreamID edm::StreamSchedule::streamID ( ) const
inline

Definition at line 191 of file StreamSchedule.h.

References AlCaHLTBitMon_QueryRunRegistry::string, and triggerPaths.

191 { return streamID_; }
int edm::StreamSchedule::totalEvents ( ) const
inline

Return the number of events this StreamSchedule has tried to process (inclues both successes and failures, including failures due to exceptions during processing).

Definition at line 227 of file StreamSchedule.h.

Referenced by getTriggerReport().

227  {
228  return total_events_;
229  }
int edm::StreamSchedule::totalEventsFailed ( ) const
inline

Return the number of events that have not passed any trigger. (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()

Definition at line 239 of file StreamSchedule.h.

References cuy::rep, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by getTriggerReport().

239  {
240  return totalEvents() - totalEventsPassed();
241  }
int totalEvents() const
int totalEventsPassed() const
int edm::StreamSchedule::totalEventsPassed ( ) const
inline

Return the number of events which have been passed by one or more trigger paths.

Definition at line 233 of file StreamSchedule.h.

Referenced by getTriggerReport().

233  {
234  return total_passed_;
235  }
void edm::StreamSchedule::triggerPaths ( std::vector< std::string > &  oLabelsToFill) const

adds to oLabelsToFill the labels for all trigger paths in the process this is different from availablePaths because it includes the empty paths so matches the entries in TriggerResults exactly.

Definition at line 707 of file StreamSchedule.cc.

References trig_name_list_.

707  {
708  oLabelsToFill = trig_name_list_;
709  }

Member Data Documentation

std::shared_ptr<ActivityRegistry> edm::StreamSchedule::actReg_
private
std::vector<BranchToCount> edm::StreamSchedule::earlyDeleteBranchToCount_
private

Definition at line 360 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

std::vector<EarlyDeleteHelper> edm::StreamSchedule::earlyDeleteHelpers_
private

Definition at line 370 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

std::vector<unsigned int> edm::StreamSchedule::earlyDeleteHelperToBranchIndicies_
private

Definition at line 367 of file StreamSchedule.h.

Referenced by initializeEarlyDelete(), and resetEarlyDelete().

vstring edm::StreamSchedule::empty_trig_path_names_
private

Definition at line 355 of file StreamSchedule.h.

Referenced by fillTrigPath().

std::vector<int> edm::StreamSchedule::empty_trig_paths_
private

Definition at line 354 of file StreamSchedule.h.

Referenced by fillTrigPath(), and processOneEventAsync().

vstring edm::StreamSchedule::end_path_name_list_
private

Definition at line 347 of file StreamSchedule.h.

Referenced by endPaths(), fillWorkers(), and StreamSchedule().

TrigPaths edm::StreamSchedule::end_paths_
private
volatile bool edm::StreamSchedule::endpathsAreActive_
private

Definition at line 378 of file StreamSchedule.h.

Referenced by enableEndPaths(), endPathsEnabled(), and finishedPaths().

unsigned int edm::StreamSchedule::number_of_unscheduled_modules_
private

Definition at line 374 of file StreamSchedule.h.

Referenced by StreamSchedule().

edm::propagate_const<TrigResPtr> edm::StreamSchedule::results_
private

Definition at line 349 of file StreamSchedule.h.

Referenced by finishedPaths(), processOneEventAsync(), and resetAll().

edm::propagate_const<WorkerPtr> edm::StreamSchedule::results_inserter_
private

Definition at line 351 of file StreamSchedule.h.

Referenced by finishedPaths(), and StreamSchedule().

std::atomic<bool> edm::StreamSchedule::skippingEvent_
private

Definition at line 379 of file StreamSchedule.h.

Referenced by fillTrigPath(), and resetAll().

StreamContext edm::StreamSchedule::streamContext_
private
StreamID edm::StreamSchedule::streamID_
private
int edm::StreamSchedule::total_events_
private

Definition at line 372 of file StreamSchedule.h.

Referenced by clearCounters(), and processOneEventAsync().

int edm::StreamSchedule::total_passed_
private

Definition at line 373 of file StreamSchedule.h.

Referenced by clearCounters(), and finishedPaths().

vstring edm::StreamSchedule::trig_name_list_
private

Definition at line 346 of file StreamSchedule.h.

Referenced by StreamSchedule(), and triggerPaths().

TrigPaths edm::StreamSchedule::trig_paths_
private
WorkerManager edm::StreamSchedule::workerManager_
private