CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StreamSchedule.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_StreamSchedule_h
2 #define FWCore_Framework_StreamSchedule_h
3 
4 /*
5  Author: Jim Kowalkowski 28-01-06
6 
7  A class for creating a schedule based on paths in the configuration file.
8  The schedule is maintained as a sequence of paths.
9  After construction, events can be fed to the object and passed through
10  all the modules in the schedule. All accounting about processing
11  of events by modules and paths is contained here or in object held
12  by containment.
13 
14  The trigger results producer and product are generated and managed here.
15  This class also manages endpaths and calls to endjob and beginjob.
16  Endpaths are just treated as a simple list of modules that need to
17  do processing of the event and do not participate in trigger path
18  activities.
19 
20  This class requires the high-level process pset. It uses @process_name.
21  If the high-level pset contains an "options" pset, then the
22  following optional parameter can be present:
23  bool wantSummary = true/false # default false
24 
25  wantSummary indicates whether or not the pass/fail/error stats
26  for modules and paths should be printed at the end-of-job.
27 
28  A TriggerResults object will always be inserted into the event
29  for any schedule. The producer of the TriggerResults EDProduct
30  is always the first module in the endpath. The TriggerResultInserter
31  is given a fixed label of "TriggerResults".
32 
33  Processing of an event happens by pushing the event through the Paths.
34  The scheduler performs the reset() on each of the workers independent
35  of the Path objects.
36 
37  ------------------------
38 
39  About Paths:
40  Paths fit into two categories:
41  1) trigger paths that contribute directly to saved trigger bits
42  2) end paths
43  The StreamSchedule holds these paths in two data structures:
44  1) main path list
45  2) end path list
46 
47  Trigger path processing always precedes endpath processing.
48  The order of the paths from the input configuration is
49  preserved in the main paths list.
50 
51  ------------------------
52 
53  The StreamSchedule uses the TriggerNamesService to get the names of the
54  trigger paths and end paths. When a TriggerResults object is created
55  the results are stored in the same order as the trigger names from
56  TriggerNamesService.
57 
58 */
59 
85 
86 #include <map>
87 #include <memory>
88 #include <set>
89 #include <string>
90 #include <vector>
91 #include <sstream>
92 
93 namespace edm {
94 
95  class ActivityRegistry;
96  class BranchIDListHelper;
97  class EventSetup;
98  class ExceptionCollector;
99  class OutputModuleCommunicator;
100  class ProcessContext;
101  class UnscheduledCallProducer;
102  class WorkerInPath;
103  class ModuleRegistry;
104  class TriggerResultInserter;
105  class PreallocationConfiguration;
106 
107  namespace service {
108  class TriggerNamesService;
109  }
110 
111  namespace {
112  template <typename T>
113  class StreamScheduleSignalSentry {
114  public:
115  StreamScheduleSignalSentry(ActivityRegistry* a, typename T::Context const* context) :
116  a_(a), context_(context), allowThrow_(false) {
117  if (a_) T::preScheduleSignal(a_, context_);
118  }
119  ~StreamScheduleSignalSentry() noexcept(false) {
120  try {
121  if (a_) { T::postScheduleSignal(a_, context_); }
122  } catch(...) {
123  if(allowThrow_) {throw;}
124  }
125  }
126 
127  void allowThrow() {
128  allowThrow_ = true;
129  }
130 
131  private:
132  // We own none of these resources.
133  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
134  typename T::Context const* context_;
135  bool allowThrow_;
136  };
137  }
138 
140  public:
141  typedef std::vector<std::string> vstring;
142  typedef std::vector<Path> TrigPaths;
143  typedef std::vector<Path> NonTrigPaths;
144  typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
145  typedef std::shared_ptr<HLTGlobalStatus const> TrigResConstPtr;
146  typedef std::shared_ptr<Worker> WorkerPtr;
147  typedef std::vector<Worker*> AllWorkers;
148 
149  typedef std::vector<Worker*> Workers;
150 
151  typedef std::vector<WorkerInPath> PathWorkers;
152 
153  StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
154  std::shared_ptr<ModuleRegistry>,
155  ParameterSet& proc_pset,
158  ProductRegistry& pregistry,
159  BranchIDListHelper& branchIDListHelper,
161  std::shared_ptr<ActivityRegistry> areg,
162  std::shared_ptr<ProcessConfiguration> processConfiguration,
163  bool allowEarlyDelete,
165  ProcessContext const* processContext);
166 
167  StreamSchedule(StreamSchedule const&) = delete;
168 
169  template <typename T>
170  void processOneEvent(typename T::MyPrincipal& principal,
171  EventSetup const& eventSetup,
172  bool cleaningUpAfterException = false);
173 
174  template <typename T>
175  void processOneStream(typename T::MyPrincipal& principal,
176  EventSetup const& eventSetup,
177  bool cleaningUpAfterException = false);
178 
179  void beginStream();
180  void endStream();
181 
182  StreamID streamID() const { return streamID_; }
183 
186 
190  std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
191 
193  void availablePaths(std::vector<std::string>& oLabelsToFill) const;
194 
198  void triggerPaths(std::vector<std::string>& oLabelsToFill) const;
199 
201  void endPaths(std::vector<std::string>& oLabelsToFill) const;
202 
204  void modulesInPath(std::string const& iPathLabel,
205  std::vector<std::string>& oLabelsToFill) const;
206 
207  void moduleDescriptionsInPath(std::string const& iPathLabel,
208  std::vector<ModuleDescription const*>& descriptions,
209  unsigned int hint) const;
210 
211  void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
212  std::vector<ModuleDescription const*>& descriptions,
213  unsigned int hint) const;
214 
218  int totalEvents() const {
219  return total_events_;
220  }
221 
224  int totalEventsPassed() const {
225  return total_passed_;
226  }
227 
230  int totalEventsFailed() const {
231  return totalEvents() - totalEventsPassed();
232  }
233 
236  void enableEndPaths(bool active);
237 
240  bool endPathsEnabled() const;
241 
244  void getTriggerReport(TriggerReport& rep) const;
245 
247  void clearCounters();
248 
250  void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
251 
253  AllWorkers const& allWorkers() const {
254  return workerManager_.allWorkers();
255  }
256 
257  unsigned int numberOfUnscheduledModules() const {
259  }
260 
261  private:
262  //Sentry class to only send a signal if an
263  // exception occurs. An exception is identified
264  // by the destructor being called without first
265  // calling completedSuccessfully().
267  public:
269  reg_(iReg),
270  context_(iContext){}
272  if(reg_) {
274  }
275  }
277  reg_ = nullptr;
278  }
279  private:
280  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
282  };
283 
286  return workerManager_.actionTable();
287  }
288 
289 
290  void resetAll();
291 
292  template <typename T>
293  bool runTriggerPaths(typename T::MyPrincipal const&, EventSetup const&, typename T::Context const*);
294 
295  template <typename T>
296  void runEndPaths(typename T::MyPrincipal const&, EventSetup const&, typename T::Context const*);
297 
298  void reportSkipped(EventPrincipal const& ep) const;
299 
300  void fillWorkers(ParameterSet& proc_pset,
303  std::shared_ptr<ProcessConfiguration const> processConfiguration,
304  std::string const& name, bool ignoreFilters, PathWorkers& out,
305  vstring* labelsOnPaths);
306  void fillTrigPath(ParameterSet& proc_pset,
309  std::shared_ptr<ProcessConfiguration const> processConfiguration,
310  int bitpos, std::string const& name, TrigResPtr,
311  vstring* labelsOnTriggerPaths);
312  void fillEndPath(ParameterSet& proc_pset,
315  std::shared_ptr<ProcessConfiguration const> processConfiguration,
316  int bitpos, std::string const& name);
317 
318  void addToAllWorkers(Worker* w);
319 
320  void resetEarlyDelete();
322  edm::ParameterSet const& opts,
323  edm::ProductRegistry const& preg,
324  bool allowEarlyDelete);
325 
328 
330  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
331 
334 
336 
340  std::vector<int> empty_trig_paths_;
342 
343  //For each branch that has been marked for early deletion
344  // keep track of how many modules are left that read this data but have
345  // not yet been run in this event
346  std::vector<std::pair<BranchID,unsigned int>> earlyDeleteBranchToCount_;
347  //NOTE the following is effectively internal data for each EarlyDeleteHelper
348  // but putting it into one vector makes for better allocation as well as
349  // faster iteration when used to reset the earlyDeleteBranchToCount_
350  // Each EarlyDeleteHelper hold a begin and end range into this vector. The values
351  // of this vector correspond to indexes into earlyDeleteBranchToCount_ so
352  // tell which EarlyDeleteHelper is associated with which BranchIDs.
353  std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
354  //There is one EarlyDeleteHelper per Module which are reading data that
355  // has been marked for early deletion
356  std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
357 
361 
364  volatile bool endpathsAreActive_;
365  };
366 
367  void
368  inline
370  Service<JobReport> reportSvc;
371  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
372  }
373 
374  template <typename T>
375  void StreamSchedule::processOneEvent(typename T::MyPrincipal& ep,
376  EventSetup const& es,
377  bool cleaningUpAfterException) {
378  this->resetAll();
379  for (int empty_trig_path : empty_trig_paths_) {
380  results_->at(empty_trig_path) = HLTPathStatus(hlt::Pass, 0);
381  }
382 
383  T::setStreamContext(streamContext_, ep);
384  StreamScheduleSignalSentry<T> sentry(actReg_.get(), &streamContext_);
385 
386  SendTerminationSignalIfException terminationSentry(actReg_.get(), &streamContext_);
387  // This call takes care of the unscheduled processing.
388  workerManager_.processOneOccurrence<T>(ep, es, streamID_, &streamContext_, &streamContext_, cleaningUpAfterException);
389 
390  ++total_events_;
391  try {
392  convertException::wrap([&]() {
393  try {
394  if (runTriggerPaths<T>(ep, es, &streamContext_)) {
395  ++total_passed_;
396  }
397  }
398  catch(cms::Exception& e) {
402  if (action == exception_actions::SkipEvent) {
403  edm::printCmsExceptionWarning("SkipEvent", e);
404  } else {
405  throw;
406  }
407  }
408 
409  try {
410  ParentContext parentContext(&streamContext_);
411  if (results_inserter_.get()) results_inserter_->doWork<T>(ep, es, streamID_, parentContext, &streamContext_);
412  }
413  catch (cms::Exception & ex) {
414  if (T::isEvent_) {
415  ex.addContext("Calling produce method for module TriggerResultInserter");
416  }
417  std::ostringstream ost;
418  ost << "Processing " << ep.id();
419  ex.addContext(ost.str());
420  throw;
421  }
422 
423  if (endpathsAreActive_) runEndPaths<T>(ep, es, &streamContext_);
425  });
426  }
427  catch(cms::Exception& ex) {
428  if (ex.context().empty()) {
429  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
430  } else {
431  addContextAndPrintException("", ex, cleaningUpAfterException);
432  }
433  throw;
434  }
435  terminationSentry.completedSuccessfully();
436 
437  //If we got here no other exception has happened so we can propogate any Service related exceptions
438  sentry.allowThrow();
439  }
440 
441  template <typename T>
442  void StreamSchedule::processOneStream(typename T::MyPrincipal& ep,
443  EventSetup const& es,
444  bool cleaningUpAfterException) {
445  this->resetAll();
446 
447  T::setStreamContext(streamContext_, ep);
448  StreamScheduleSignalSentry<T> sentry(actReg_.get(), &streamContext_);
449 
450  SendTerminationSignalIfException terminationSentry(actReg_.get(), &streamContext_);
451 
452  // This call takes care of the unscheduled processing.
453  workerManager_.processOneOccurrence<T>(ep, es, streamID_, &streamContext_, &streamContext_, cleaningUpAfterException);
454 
455  try {
456  convertException::wrap([&]() {
457  runTriggerPaths<T>(ep, es, &streamContext_);
458 
459  if (endpathsAreActive_) runEndPaths<T>(ep, es, &streamContext_);
460  });
461  }
462  catch(cms::Exception& ex) {
463  if (ex.context().empty()) {
464  addContextAndPrintException("Calling function StreamSchedule::processOneStream", ex, cleaningUpAfterException);
465  } else {
466  addContextAndPrintException("", ex, cleaningUpAfterException);
467  }
468  throw;
469  }
470  terminationSentry.completedSuccessfully();
471 
472  //If we got here no other exception has happened so we can propogate any Service related exceptions
473  sentry.allowThrow();
474  }
475 
476  template <typename T>
477  bool
478  StreamSchedule::runTriggerPaths(typename T::MyPrincipal const& ep, EventSetup const& es, typename T::Context const* context) {
479  for(auto& p : trig_paths_) {
480  p.processOneOccurrence<T>(ep, es, streamID_, context);
481  }
482  return results_->accept();
483  }
484 
485  template <typename T>
486  void
487  StreamSchedule::runEndPaths(typename T::MyPrincipal const& ep, EventSetup const& es, typename T::Context const* context) {
488  // Note there is no state-checking safety controlling the
489  // activation/deactivation of endpaths.
490  for(auto& p : end_paths_) {
491  p.processOneOccurrence<T>(ep, es, streamID_, context);
492  }
493  }
494 }
495 
496 #endif
RunNumber_t run() const
Definition: EventID.h:39
void triggerPaths(std::vector< std::string > &oLabelsToFill) const
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
EventNumber_t event() const
Definition: EventID.h:41
string rep
Definition: cuy.py:1188
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
std::vector< Worker * > Workers
std::vector< int > empty_trig_paths_
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name)
const double w
Definition: UKUtility.cc:23
int totalEventsFailed() const
unsigned int numberOfUnscheduledModules() const
std::vector< Path > NonTrigPaths
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
std::shared_ptr< HLTGlobalStatus > TrigResPtr
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
std::vector< std::pair< BranchID, unsigned int > > earlyDeleteBranchToCount_
void processOneOccurrence(typename T::MyPrincipal &principal, EventSetup const &eventSetup, StreamID streamID, typename T::Context const *topContext, U const *context, bool cleaningUpAfterException=false)
Definition: WorkerManager.h:95
volatile bool endpathsAreActive_
assert(m_qm.get())
EventID const & id() const
int totalEvents() const
#define noexcept
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
processConfiguration
Definition: Schedule.cc:374
std::string const & category() const
Definition: Exception.cc:183
void addToAllWorkers(Worker *w)
exception_actions::ActionCodes find(const std::string &category) const
void reportSkipped(EventPrincipal const &ep) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
void processOneEvent(typename T::MyPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
std::shared_ptr< Worker > WorkerPtr
actions
Definition: Schedule.cc:374
void runEndPaths(typename T::MyPrincipal const &, EventSetup const &, typename T::Context const *)
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
int totalEventsPassed() const
void getTriggerReport(TriggerReport &rep) const
StreamID streamID() const
PreStreamEarlyTermination preStreamEarlyTerminationSignal_
accept
Definition: HLTenums.h:19
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
SendTerminationSignalIfException(edm::ActivityRegistry *iReg, edm::StreamContext const *iContext)
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, vstring *labelsOnPaths)
StreamContext streamContext_
std::vector< std::string > vstring
std::list< std::string > const & context() const
Definition: Exception.cc:191
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::vector< Worker * > AllWorkers
vstring empty_trig_path_names_
element_type const * get() const
areg
Definition: Schedule.cc:374
void clearCounters()
Clear all the counters in the trigger report.
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:64
TrigResPtr & results()
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
edm::propagate_const< TrigResPtr > results_
void addContext(std::string const &context)
Definition: Exception.cc:227
string action
Definition: mps_fire.py:28
bool endPathsEnabled() const
double a
Definition: hdecay.h:121
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, vstring *labelsOnTriggerPaths)
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
void enableEndPaths(bool active)
auto wrap(F iFunc) -> decltype(iFunc())
volatile std::atomic< bool > shutdown_flag false
bool runTriggerPaths(typename T::MyPrincipal const &, EventSetup const &, typename T::Context const *)
preg
Definition: Schedule.cc:374
long double T
TrigResConstPtr results() const
std::vector< Path > TrigPaths
std::vector< std::string > vstring
Definition: Schedule.cc:358
void processOneStream(typename T::MyPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:68
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
prealloc
Definition: Schedule.cc:374
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
void endPaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all end paths in the process