CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StreamSchedule.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_StreamSchedule_h
2 #define FWCore_Framework_StreamSchedule_h
3 
4 /*
5  Author: Jim Kowalkowski 28-01-06
6 
7  A class for creating a schedule based on paths in the configuration file.
8  The schedule is maintained as a sequence of paths.
9  After construction, events can be fed to the object and passed through
10  all the modules in the schedule. All accounting about processing
11  of events by modules and paths is contained here or in object held
12  by containment.
13 
14  The trigger results producer and product are generated and managed here.
15  This class also manages endpaths and calls to endjob and beginjob.
16  Endpaths are just treated as a simple list of modules that need to
17  do processing of the event and do not participate in trigger path
18  activities.
19 
20  This class requires the high-level process pset. It uses @process_name.
21  If the high-level pset contains an "options" pset, then the
22  following optional parameter can be present:
23  bool wantSummary = true/false # default false
24 
25  wantSummary indicates whether or not the pass/fail/error stats
26  for modules and paths should be printed at the end-of-job.
27 
28  A TriggerResults object will always be inserted into the event
29  for any schedule. The producer of the TriggerResults EDProduct
30  is always the first module in the endpath. The TriggerResultInserter
31  is given a fixed label of "TriggerResults".
32 
33  Processing of an event happens by pushing the event through the Paths.
34  The scheduler performs the reset() on each of the workers independent
35  of the Path objects.
36 
37  ------------------------
38 
39  About Paths:
40  Paths fit into two categories:
41  1) trigger paths that contribute directly to saved trigger bits
42  2) end paths
43  The StreamSchedule holds these paths in two data structures:
44  1) main path list
45  2) end path list
46 
47  Trigger path processing always precedes endpath processing.
48  The order of the paths from the input configuration is
49  preserved in the main paths list.
50 
51  ------------------------
52 
53  The StreamSchedule uses the TriggerNamesService to get the names of the
54  trigger paths and end paths. When a TriggerResults object is created
55  the results are stored in the same order as the trigger names from
56  TriggerNamesService.
57 
58 */
59 
84 
85 #include <map>
86 #include <memory>
87 #include <set>
88 #include <string>
89 #include <vector>
90 #include <sstream>
91 
92 namespace edm {
93 
94  class ActivityRegistry;
95  class BranchIDListHelper;
96  class EventSetup;
97  class ExceptionCollector;
98  class OutputModuleCommunicator;
99  class ProcessContext;
100  class UnscheduledCallProducer;
101  class WorkerInPath;
102  class ModuleRegistry;
103  class TriggerResultInserter;
104  class PreallocationConfiguration;
105 
106  namespace service {
107  class TriggerNamesService;
108  }
109 
110  namespace {
111  template <typename T>
112  class StreamScheduleSignalSentry {
113  public:
114  StreamScheduleSignalSentry(ActivityRegistry* a, typename T::Context const* context) :
115  a_(a), context_(context), allowThrow_(false) {
116  if (a_) T::preScheduleSignal(a_, context_);
117  }
118  ~StreamScheduleSignalSentry() noexcept(false) {
119  try {
120  if (a_) { T::postScheduleSignal(a_, context_); }
121  } catch(...) {
122  if(allowThrow_) {throw;}
123  }
124  }
125 
126  void allowThrow() {
127  allowThrow_ = true;
128  }
129 
130  private:
131  // We own none of these resources.
132  ActivityRegistry* a_;
133  typename T::Context const* context_;
134  bool allowThrow_;
135  };
136  }
137 
139  public:
140  typedef std::vector<std::string> vstring;
141  typedef std::vector<Path> TrigPaths;
142  typedef std::vector<Path> NonTrigPaths;
143  typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
144  typedef std::shared_ptr<Worker> WorkerPtr;
145  typedef std::vector<Worker*> AllWorkers;
146  typedef std::vector<std::shared_ptr<OutputModuleCommunicator> > AllOutputModuleCommunicators;
147 
148  typedef std::vector<Worker*> Workers;
149 
150  typedef std::vector<WorkerInPath> PathWorkers;
151 
152  StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
153  std::shared_ptr<ModuleRegistry>,
154  ParameterSet& proc_pset,
157  ProductRegistry& pregistry,
158  BranchIDListHelper& branchIDListHelper,
160  std::shared_ptr<ActivityRegistry> areg,
161  std::shared_ptr<ProcessConfiguration> processConfiguration,
162  bool allowEarlyDelete,
164  ProcessContext const* processContext);
165 
166  StreamSchedule(StreamSchedule const&) = delete;
167 
168  template <typename T>
169  void processOneEvent(typename T::MyPrincipal& principal,
170  EventSetup const& eventSetup,
171  bool cleaningUpAfterException = false);
172 
173  template <typename T>
174  void processOneStream(typename T::MyPrincipal& principal,
175  EventSetup const& eventSetup,
176  bool cleaningUpAfterException = false);
177 
178  void beginStream();
179  void endStream();
180 
181  StreamID streamID() const { return streamID_; }
182 
185 
189  std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
190 
192  void availablePaths(std::vector<std::string>& oLabelsToFill) const;
193 
195  void modulesInPath(std::string const& iPathLabel,
196  std::vector<std::string>& oLabelsToFill) const;
197 
201  int totalEvents() const {
202  return total_events_;
203  }
204 
207  int totalEventsPassed() const {
208  return total_passed_;
209  }
210 
213  int totalEventsFailed() const {
214  return totalEvents() - totalEventsPassed();
215  }
216 
219  void enableEndPaths(bool active);
220 
223  bool endPathsEnabled() const;
224 
227  void getTriggerReport(TriggerReport& rep) const;
228 
230  void clearCounters();
231 
233  void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
234 
236  AllWorkers const& allWorkers() const {
237  return workerManager_.allWorkers();
238  }
239 
240  unsigned int numberOfUnscheduledModules() const {
242  }
243 
244  private:
245  //Sentry class to only send a signal if an
246  // exception occurs. An exception is identified
247  // by the destructor being called without first
248  // calling completedSuccessfully().
250  public:
252  reg_(iReg),
253  context_(iContext){}
255  if(reg_) {
257  }
258  }
260  reg_ = nullptr;
261  }
262  private:
265  };
266 
269  return workerManager_.actionTable();
270  }
271 
272 
273  void resetAll();
274 
275  template <typename T>
276  bool runTriggerPaths(typename T::MyPrincipal&, EventSetup const&, typename T::Context const*);
277 
278  template <typename T>
279  void runEndPaths(typename T::MyPrincipal&, EventSetup const&, typename T::Context const*);
280 
281  void reportSkipped(EventPrincipal const& ep) const;
282 
283  void fillWorkers(ParameterSet& proc_pset,
286  std::shared_ptr<ProcessConfiguration const> processConfiguration,
287  std::string const& name, bool ignoreFilters, PathWorkers& out,
288  vstring* labelsOnPaths);
289  void fillTrigPath(ParameterSet& proc_pset,
292  std::shared_ptr<ProcessConfiguration const> processConfiguration,
293  int bitpos, std::string const& name, TrigResPtr,
294  vstring* labelsOnTriggerPaths);
295  void fillEndPath(ParameterSet& proc_pset,
298  std::shared_ptr<ProcessConfiguration const> processConfiguration,
299  int bitpos, std::string const& name);
300 
301  void addToAllWorkers(Worker* w);
302 
303  void resetEarlyDelete();
305  edm::ParameterSet const& opts,
306  edm::ProductRegistry const& preg,
307  bool allowEarlyDelete);
308 
310  std::shared_ptr<ActivityRegistry> actReg_;
311 
314 
316 
320  std::vector<int> empty_trig_paths_;
322 
323  //For each branch that has been marked for early deletion
324  // keep track of how many modules are left that read this data but have
325  // not yet been run in this event
326  std::vector<std::pair<BranchID,unsigned int>> earlyDeleteBranchToCount_;
327  //NOTE the following is effectively internal data for each EarlyDeleteHelper
328  // but putting it into one vector makes for better allocation as well as
329  // faster iteration when used to reset the earlyDeleteBranchToCount_
330  // Each EarlyDeleteHelper hold a begin and end range into this vector. The values
331  // of this vector correspond to indexes into earlyDeleteBranchToCount_ so
332  // tell which EarlyDeleteHelper is associated with which BranchIDs.
333  std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
334  //There is one EarlyDeleteHelper per Module which are reading data that
335  // has been marked for early deletion
336  std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
337 
341 
344  volatile bool endpathsAreActive_;
345  };
346 
347  void
348  inline
350  Service<JobReport> reportSvc;
351  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
352  }
353 
354  template <typename T>
355  void StreamSchedule::processOneEvent(typename T::MyPrincipal& ep,
356  EventSetup const& es,
357  bool cleaningUpAfterException) {
358  this->resetAll();
359  for (int empty_trig_path : empty_trig_paths_) {
360  results_->at(empty_trig_path) = HLTPathStatus(hlt::Pass, 0);
361  }
362 
363  T::setStreamContext(streamContext_, ep);
364  StreamScheduleSignalSentry<T> sentry(actReg_.get(), &streamContext_);
365 
366  SendTerminationSignalIfException terminationSentry(actReg_.get(), &streamContext_);
367  // This call takes care of the unscheduled processing.
368  workerManager_.processOneOccurrence<T>(ep, es, streamID_, &streamContext_, &streamContext_, cleaningUpAfterException);
369 
370  ++total_events_;
371  try {
372  convertException::wrap([&]() {
373  try {
374  if (runTriggerPaths<T>(ep, es, &streamContext_)) {
375  ++total_passed_;
376  }
377  }
378  catch(cms::Exception& e) {
380  assert (action != exception_actions::IgnoreCompletely);
381  assert (action != exception_actions::FailPath);
382  if (action == exception_actions::SkipEvent) {
383  edm::printCmsExceptionWarning("SkipEvent", e);
384  } else {
385  throw;
386  }
387  }
388 
389  try {
390  ParentContext parentContext(&streamContext_);
391  if (results_inserter_.get()) results_inserter_->doWork<T>(ep, es, streamID_, parentContext, &streamContext_);
392  }
393  catch (cms::Exception & ex) {
394  if (T::isEvent_) {
395  ex.addContext("Calling produce method for module TriggerResultInserter");
396  }
397  std::ostringstream ost;
398  ost << "Processing " << ep.id();
399  ex.addContext(ost.str());
400  throw;
401  }
402 
403  if (endpathsAreActive_) runEndPaths<T>(ep, es, &streamContext_);
405  });
406  }
407  catch(cms::Exception& ex) {
408  if (ex.context().empty()) {
409  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
410  } else {
411  addContextAndPrintException("", ex, cleaningUpAfterException);
412  }
413  throw;
414  }
415  terminationSentry.completedSuccessfully();
416 
417  //If we got here no other exception has happened so we can propogate any Service related exceptions
418  sentry.allowThrow();
419  }
420 
421  template <typename T>
422  void StreamSchedule::processOneStream(typename T::MyPrincipal& ep,
423  EventSetup const& es,
424  bool cleaningUpAfterException) {
425  this->resetAll();
426 
427  T::setStreamContext(streamContext_, ep);
428  StreamScheduleSignalSentry<T> sentry(actReg_.get(), &streamContext_);
429 
430  SendTerminationSignalIfException terminationSentry(actReg_.get(), &streamContext_);
431 
432  // This call takes care of the unscheduled processing.
433  workerManager_.processOneOccurrence<T>(ep, es, streamID_, &streamContext_, &streamContext_, cleaningUpAfterException);
434 
435  try {
436  convertException::wrap([&]() {
437  runTriggerPaths<T>(ep, es, &streamContext_);
438 
439  if (endpathsAreActive_) runEndPaths<T>(ep, es, &streamContext_);
440  });
441  }
442  catch(cms::Exception& ex) {
443  if (ex.context().empty()) {
444  addContextAndPrintException("Calling function StreamSchedule::processOneStream", ex, cleaningUpAfterException);
445  } else {
446  addContextAndPrintException("", ex, cleaningUpAfterException);
447  }
448  throw;
449  }
450  terminationSentry.completedSuccessfully();
451 
452  //If we got here no other exception has happened so we can propogate any Service related exceptions
453  sentry.allowThrow();
454  }
455 
456  template <typename T>
457  bool
458  StreamSchedule::runTriggerPaths(typename T::MyPrincipal& ep, EventSetup const& es, typename T::Context const* context) {
459  for(auto& p : trig_paths_) {
460  p.processOneOccurrence<T>(ep, es, streamID_, context);
461  }
462  return results_->accept();
463  }
464 
465  template <typename T>
466  void
467  StreamSchedule::runEndPaths(typename T::MyPrincipal& ep, EventSetup const& es, typename T::Context const* context) {
468  // Note there is no state-checking safety controlling the
469  // activation/deactivation of endpaths.
470  for(auto& p : end_paths_) {
471  p.processOneOccurrence<T>(ep, es, streamID_, context);
472  }
473  }
474 }
475 
476 #endif
RunNumber_t run() const
Definition: EventID.h:39
EventNumber_t event() const
Definition: EventID.h:41
string rep
Definition: cuy.py:1188
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void runEndPaths(typename T::MyPrincipal &, EventSetup const &, typename T::Context const *)
std::vector< Worker * > Workers
std::vector< int > empty_trig_paths_
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name)
const double w
Definition: UKUtility.cc:23
int totalEventsFailed() const
unsigned int numberOfUnscheduledModules() const
std::vector< Path > NonTrigPaths
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
std::shared_ptr< HLTGlobalStatus > TrigResPtr
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
std::vector< std::pair< BranchID, unsigned int > > earlyDeleteBranchToCount_
void processOneOccurrence(typename T::MyPrincipal &principal, EventSetup const &eventSetup, StreamID streamID, typename T::Context const *topContext, U const *context, bool cleaningUpAfterException=false)
Definition: WorkerManager.h:91
volatile bool endpathsAreActive_
EventID const & id() const
int totalEvents() const
#define noexcept
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
processConfiguration
Definition: Schedule.cc:370
std::string const & category() const
Definition: Exception.cc:183
void addToAllWorkers(Worker *w)
exception_actions::ActionCodes find(const std::string &category) const
void reportSkipped(EventPrincipal const &ep) const
bool runTriggerPaths(typename T::MyPrincipal &, EventSetup const &, typename T::Context const *)
ExceptionToActionTable const & actionTable() const
returns the action table
void processOneEvent(typename T::MyPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
std::shared_ptr< Worker > WorkerPtr
actions
Definition: Schedule.cc:370
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
int totalEventsPassed() const
void getTriggerReport(TriggerReport &rep) const
StreamID streamID() const
PreStreamEarlyTermination preStreamEarlyTerminationSignal_
accept
Definition: HLTenums.h:19
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
SendTerminationSignalIfException(edm::ActivityRegistry *iReg, edm::StreamContext const *iContext)
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, vstring *labelsOnPaths)
StreamContext streamContext_
std::vector< std::string > vstring
std::list< std::string > const & context() const
Definition: Exception.cc:191
std::vector< std::shared_ptr< OutputModuleCommunicator > > AllOutputModuleCommunicators
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e, edm::JobReport *jobRep=0, int rc=-1)
std::vector< Worker * > AllWorkers
vstring empty_trig_path_names_
areg
Definition: Schedule.cc:370
tuple out
Definition: dbtoconf.py:99
void clearCounters()
Clear all the counters in the trigger report.
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:63
WorkerPtr results_inserter_
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void addContext(std::string const &context)
Definition: Exception.cc:227
bool endPathsEnabled() const
double a
Definition: hdecay.h:121
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, vstring *labelsOnTriggerPaths)
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
void enableEndPaths(bool active)
auto wrap(F iFunc) -> decltype(iFunc())
volatile std::atomic< bool > shutdown_flag false
preg
Definition: Schedule.cc:370
long double T
std::vector< Path > TrigPaths
std::vector< std::string > vstring
Definition: Schedule.cc:354
void processOneStream(typename T::MyPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:67
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
prealloc
Definition: Schedule.cc:370