CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StreamSchedule.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_StreamSchedule_h
2 #define FWCore_Framework_StreamSchedule_h
3 
4 /*
5  Author: Jim Kowalkowski 28-01-06
6 
7  A class for creating a schedule based on paths in the configuration file.
8  The schedule is maintained as a sequence of paths.
9  After construction, events can be fed to the object and passed through
10  all the modules in the schedule. All accounting about processing
11  of events by modules and paths is contained here or in object held
12  by containment.
13 
14  The trigger results producer and product are generated and managed here.
15  This class also manages endpaths and calls to endjob and beginjob.
16  Endpaths are just treated as a simple list of modules that need to
17  do processing of the event and do not participate in trigger path
18  activities.
19 
20  This class requires the high-level process pset. It uses @process_name.
21  If the high-level pset contains an "options" pset, then the
22  following optional parameter can be present:
23  bool wantSummary = true/false # default false
24 
25  wantSummary indicates whether or not the pass/fail/error stats
26  for modules and paths should be printed at the end-of-job.
27 
28  A TriggerResults object will always be inserted into the event
29  for any schedule. The producer of the TriggerResults EDProduct
30  is always the first module in the endpath. The TriggerResultInserter
31  is given a fixed label of "TriggerResults".
32 
33  Processing of an event happens by pushing the event through the Paths.
34  The scheduler performs the reset() on each of the workers independent
35  of the Path objects.
36 
37  ------------------------
38 
39  About Paths:
40  Paths fit into two categories:
41  1) trigger paths that contribute directly to saved trigger bits
42  2) end paths
43  The StreamSchedule holds these paths in two data structures:
44  1) main path list
45  2) end path list
46 
47  Trigger path processing always precedes endpath processing.
48  The order of the paths from the input configuration is
49  preserved in the main paths list.
50 
51  ------------------------
52 
53  The StreamSchedule uses the TriggerNamesService to get the names of the
54  trigger paths and end paths. When a TriggerResults object is created
55  the results are stored in the same order as the trigger names from
56  TriggerNamesService.
57 
58 */
59 
85 
86 #include "boost/shared_ptr.hpp"
87 
88 #include <map>
89 #include <memory>
90 #include <set>
91 #include <string>
92 #include <vector>
93 #include <sstream>
94 
95 namespace edm {
96 
97  class ActivityRegistry;
98  class BranchIDListHelper;
99  class EventSetup;
100  class ExceptionCollector;
101  class OutputModuleCommunicator;
102  class ProcessContext;
103  class RunStopwatch;
104  class UnscheduledCallProducer;
105  class WorkerInPath;
106  struct TriggerTimingReport;
107  class ModuleRegistry;
108  class TriggerResultInserter;
109  class PreallocationConfiguration;
110 
111  namespace service {
112  class TriggerNamesService;
113  }
114 
115  namespace {
116  template <typename T>
117  class StreamScheduleSignalSentry {
118  public:
119  StreamScheduleSignalSentry(ActivityRegistry* a, typename T::Context const* context) :
120  a_(a), context_(context), allowThrow_(false) {
121  if (a_) T::preScheduleSignal(a_, context_);
122  }
123  ~StreamScheduleSignalSentry() noexcept(false) {
124  try {
125  if (a_) { T::postScheduleSignal(a_, context_); }
126  } catch(...) {
127  if(allowThrow_) {throw;}
128  }
129  }
130 
131  void allowThrow() {
132  allowThrow_ = true;
133  }
134 
135  private:
136  // We own none of these resources.
137  ActivityRegistry* a_;
138  typename T::Context const* context_;
139  bool allowThrow_;
140  };
141  }
142 
144  public:
145  typedef std::vector<std::string> vstring;
146  typedef std::vector<Path> TrigPaths;
147  typedef std::vector<Path> NonTrigPaths;
148  typedef boost::shared_ptr<HLTGlobalStatus> TrigResPtr;
149  typedef boost::shared_ptr<Worker> WorkerPtr;
150  typedef std::vector<Worker*> AllWorkers;
151  typedef std::vector<boost::shared_ptr<OutputModuleCommunicator>> AllOutputModuleCommunicators;
152 
153  typedef std::vector<Worker*> Workers;
154 
155  typedef std::vector<WorkerInPath> PathWorkers;
156 
158  boost::shared_ptr<ModuleRegistry>,
159  ParameterSet& proc_pset,
162  ProductRegistry& pregistry,
163  BranchIDListHelper& branchIDListHelper,
165  boost::shared_ptr<ActivityRegistry> areg,
166  boost::shared_ptr<ProcessConfiguration> processConfiguration,
167  bool allowEarlyDelete,
169  ProcessContext const* processContext);
170 
171  StreamSchedule(StreamSchedule const&) = delete;
172 
173  template <typename T>
174  void processOneEvent(typename T::MyPrincipal& principal,
175  EventSetup const& eventSetup,
176  bool cleaningUpAfterException = false);
177 
178  template <typename T>
179  void processOneStream(typename T::MyPrincipal& principal,
180  EventSetup const& eventSetup,
181  bool cleaningUpAfterException = false);
182 
183  void beginStream();
184  void endStream();
185 
186  StreamID streamID() const { return streamID_; }
187 
188  std::pair<double, double> timeCpuReal() const {
189  return std::pair<double, double>(stopwatch_->cpuTime(), stopwatch_->realTime());
190  }
191 
194 
198  std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
199 
201  void availablePaths(std::vector<std::string>& oLabelsToFill) const;
202 
204  void modulesInPath(std::string const& iPathLabel,
205  std::vector<std::string>& oLabelsToFill) const;
206 
210  int totalEvents() const {
211  return total_events_;
212  }
213 
216  int totalEventsPassed() const {
217  return total_passed_;
218  }
219 
222  int totalEventsFailed() const {
223  return totalEvents() - totalEventsPassed();
224  }
225 
228  void enableEndPaths(bool active);
229 
232  bool endPathsEnabled() const;
233 
236  void getTriggerReport(TriggerReport& rep) const;
237 
241 
243  void clearCounters();
244 
246  void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
247 
249  AllWorkers const& allWorkers() const {
250  return workerManager_.allWorkers();
251  }
252 
253  unsigned int numberOfUnscheduledModules() const {
255  }
256 
257  private:
260  return workerManager_.actionTable();
261  }
262 
263 
264  void resetAll();
265 
266  template <typename T>
267  bool runTriggerPaths(typename T::MyPrincipal&, EventSetup const&, typename T::Context const*);
268 
269  template <typename T>
270  void runEndPaths(typename T::MyPrincipal&, EventSetup const&, typename T::Context const*);
271 
272  void reportSkipped(EventPrincipal const& ep) const;
273 
274  void fillWorkers(ParameterSet& proc_pset,
277  boost::shared_ptr<ProcessConfiguration const> processConfiguration,
278  std::string const& name, bool ignoreFilters, PathWorkers& out,
279  vstring* labelsOnPaths);
280  void fillTrigPath(ParameterSet& proc_pset,
283  boost::shared_ptr<ProcessConfiguration const> processConfiguration,
284  int bitpos, std::string const& name, TrigResPtr,
285  vstring* labelsOnTriggerPaths);
286  void fillEndPath(ParameterSet& proc_pset,
289  boost::shared_ptr<ProcessConfiguration const> processConfiguration,
290  int bitpos, std::string const& name);
291 
292  void addToAllWorkers(Worker* w);
293 
294  void resetEarlyDelete();
296  edm::ParameterSet const& opts,
297  edm::ProductRegistry const& preg,
298  bool allowEarlyDelete);
299 
301  boost::shared_ptr<ActivityRegistry> actReg_;
302 
305 
307 
311  std::vector<int> empty_trig_paths_;
313 
314  //For each branch that has been marked for early deletion
315  // keep track of how many modules are left that read this data but have
316  // not yet been run in this event
317  std::vector<std::pair<BranchID,unsigned int>> earlyDeleteBranchToCount_;
318  //NOTE the following is effectively internal data for each EarlyDeleteHelper
319  // but putting it into one vector makes for better allocation as well as
320  // faster iteration when used to reset the earlyDeleteBranchToCount_
321  // Each EarlyDeleteHelper hold a begin and end range into this vector. The values
322  // of this vector correspond to indexes into earlyDeleteBranchToCount_ so
323  // tell which EarlyDeleteHelper is associated with which BranchIDs.
324  std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
325  //There is one EarlyDeleteHelper per Module which are reading data that
326  // has been marked for early deletion
327  std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
328 
333 
337  volatile bool endpathsAreActive_;
338  };
339 
340  void
341  inline
343  Service<JobReport> reportSvc;
344  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
345  }
346 
347  template <typename T>
348  void StreamSchedule::processOneEvent(typename T::MyPrincipal& ep,
349  EventSetup const& es,
350  bool cleaningUpAfterException) {
351  this->resetAll();
352  for (int empty_trig_path : empty_trig_paths_) {
353  results_->at(empty_trig_path) = HLTPathStatus(hlt::Pass, 0);
354  }
355 
356  T::setStreamContext(streamContext_, ep);
357  StreamScheduleSignalSentry<T> sentry(actReg_.get(), &streamContext_);
358 
359  // A RunStopwatch, but only if we are processing an event.
360  RunStopwatch stopwatch(stopwatch_);
361 
362  // This call takes care of the unscheduled processing.
363  workerManager_.processOneOccurrence<T>(ep, es, streamID_, &streamContext_, &streamContext_, cleaningUpAfterException);
364 
365  ++total_events_;
366  try {
367  convertException::wrap([&]() {
368  try {
369  if (runTriggerPaths<T>(ep, es, &streamContext_)) {
370  ++total_passed_;
371  }
372  }
373  catch(cms::Exception& e) {
375  assert (action != exception_actions::IgnoreCompletely);
376  assert (action != exception_actions::FailPath);
377  if (action == exception_actions::SkipEvent) {
378  edm::printCmsExceptionWarning("SkipEvent", e);
379  } else {
380  throw;
381  }
382  }
383 
384  try {
385  CPUTimer timer;
386  ParentContext parentContext(&streamContext_);
387  if (results_inserter_.get()) results_inserter_->doWork<T>(ep, es, &timer,streamID_, parentContext, &streamContext_);
388  }
389  catch (cms::Exception & ex) {
390  if (T::isEvent_) {
391  ex.addContext("Calling produce method for module TriggerResultInserter");
392  }
393  std::ostringstream ost;
394  ost << "Processing " << ep.id();
395  ex.addContext(ost.str());
396  throw;
397  }
398 
399  if (endpathsAreActive_) runEndPaths<T>(ep, es, &streamContext_);
401  });
402  }
403  catch(cms::Exception& ex) {
404  if (ex.context().empty()) {
405  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
406  } else {
407  addContextAndPrintException("", ex, cleaningUpAfterException);
408  }
409  throw;
410  }
411  //If we got here no other exception has happened so we can propogate any Service related exceptions
412  sentry.allowThrow();
413  }
414 
415  template <typename T>
416  void StreamSchedule::processOneStream(typename T::MyPrincipal& ep,
417  EventSetup const& es,
418  bool cleaningUpAfterException) {
419  this->resetAll();
420 
421  T::setStreamContext(streamContext_, ep);
422  StreamScheduleSignalSentry<T> sentry(actReg_.get(), &streamContext_);
423 
424  // This call takes care of the unscheduled processing.
425  workerManager_.processOneOccurrence<T>(ep, es, streamID_, &streamContext_, &streamContext_, cleaningUpAfterException);
426 
427  try {
428  convertException::wrap([&]() {
429  runTriggerPaths<T>(ep, es, &streamContext_);
430 
431  if (endpathsAreActive_) runEndPaths<T>(ep, es, &streamContext_);
432  });
433  }
434  catch(cms::Exception& ex) {
435  if (ex.context().empty()) {
436  addContextAndPrintException("Calling function StreamSchedule::processOneStream", ex, cleaningUpAfterException);
437  } else {
438  addContextAndPrintException("", ex, cleaningUpAfterException);
439  }
440  throw;
441  }
442  //If we got here no other exception has happened so we can propogate any Service related exceptions
443  sentry.allowThrow();
444  }
445 
446  template <typename T>
447  bool
448  StreamSchedule::runTriggerPaths(typename T::MyPrincipal& ep, EventSetup const& es, typename T::Context const* context) {
449  for(auto& p : trig_paths_) {
450  p.processOneOccurrence<T>(ep, es, streamID_, context);
451  }
452  return results_->accept();
453  }
454 
455  template <typename T>
456  void
457  StreamSchedule::runEndPaths(typename T::MyPrincipal& ep, EventSetup const& es, typename T::Context const* context) {
458  // Note there is no state-checking safety controlling the
459  // activation/deactivation of endpaths.
460  for(auto& p : end_paths_) {
461  p.processOneOccurrence<T>(ep, es, streamID_, context);
462  }
463  }
464 }
465 
466 #endif
RunNumber_t run() const
Definition: EventID.h:42
EventNumber_t event() const
Definition: EventID.h:44
std::vector< boost::shared_ptr< OutputModuleCommunicator > > AllOutputModuleCommunicators
string rep
Definition: cuy.py:1188
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void runEndPaths(typename T::MyPrincipal &, EventSetup const &, typename T::Context const *)
std::vector< Worker * > Workers
std::vector< int > empty_trig_paths_
int totalEventsFailed() const
unsigned int numberOfUnscheduledModules() const
std::vector< Path > NonTrigPaths
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
std::vector< std::pair< BranchID, unsigned int > > earlyDeleteBranchToCount_
void processOneOccurrence(typename T::MyPrincipal &principal, EventSetup const &eventSetup, StreamID streamID, typename T::Context const *topContext, U const *context, bool cleaningUpAfterException=false)
Definition: WorkerManager.h:92
volatile bool endpathsAreActive_
EventID const & id() const
int totalEvents() const
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
processConfiguration
Definition: Schedule.cc:369
std::string const & category() const
Definition: Exception.cc:183
void addToAllWorkers(Worker *w)
exception_actions::ActionCodes find(const std::string &category) const
void reportSkipped(EventPrincipal const &ep) const
bool runTriggerPaths(typename T::MyPrincipal &, EventSetup const &, typename T::Context const *)
ExceptionToActionTable const & actionTable() const
returns the action table
void processOneEvent(typename T::MyPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
actions
Definition: Schedule.cc:369
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::vector< WorkerInPath > PathWorkers
boost::shared_ptr< ActivityRegistry > actReg_
boost::shared_ptr< Worker > WorkerPtr
int totalEventsPassed() const
void getTriggerReport(TriggerReport &rep) const
StreamID streamID() const
boost::shared_ptr< CPUTimer > StopwatchPointer
Definition: RunStopwatch.h:22
accept
Definition: HLTenums.h:19
std::pair< double, double > timeCpuReal() const
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, boost::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, vstring *labelsOnPaths)
StreamContext streamContext_
std::vector< std::string > vstring
std::list< std::string > const & context() const
Definition: Exception.cc:191
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, boost::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name)
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e, edm::JobReport *jobRep=0, int rc=-1)
std::vector< Worker * > AllWorkers
vstring empty_trig_path_names_
areg
Definition: Schedule.cc:369
tuple out
Definition: dbtoconf.py:99
void clearCounters()
Clear all the counters in the trigger report.
#define noexcept
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:64
WorkerPtr results_inserter_
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
RunStopwatch::StopwatchPointer stopwatch_
boost::shared_ptr< HLTGlobalStatus > TrigResPtr
void addContext(std::string const &context)
Definition: Exception.cc:227
StreamSchedule(TriggerResultInserter *inserter, boost::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, boost::shared_ptr< ActivityRegistry > areg, boost::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
void getTriggerTimingReport(TriggerTimingReport &rep) const
bool endPathsEnabled() const
double a
Definition: hdecay.h:121
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
void enableEndPaths(bool active)
T w() const
auto wrap(F iFunc) -> decltype(iFunc())
volatile std::atomic< bool > shutdown_flag false
preg
Definition: Schedule.cc:369
long double T
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, boost::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, vstring *labelsOnTriggerPaths)
std::vector< Path > TrigPaths
std::vector< std::string > vstring
Definition: Schedule.cc:354
void processOneStream(typename T::MyPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:68
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
prealloc
Definition: Schedule.cc:369