CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StreamSchedule.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_StreamSchedule_h
2 #define FWCore_Framework_StreamSchedule_h
3 
4 /*
5  Author: Jim Kowalkowski 28-01-06
6 
7  A class for creating a schedule based on paths in the configuration file.
8  The schedule is maintained as a sequence of paths.
9  After construction, events can be fed to the object and passed through
10  all the modules in the schedule. All accounting about processing
11  of events by modules and paths is contained here or in object held
12  by containment.
13 
14  The trigger results producer and product are generated and managed here.
15  This class also manages endpaths and calls to endjob and beginjob.
16  Endpaths are just treated as a simple list of modules that need to
17  do processing of the event and do not participate in trigger path
18  activities.
19 
20  This class requires the high-level process pset. It uses @process_name.
21  If the high-level pset contains an "options" pset, then the
22  following optional parameter can be present:
23  bool wantSummary = true/false # default false
24 
25  wantSummary indicates whether or not the pass/fail/error stats
26  for modules and paths should be printed at the end-of-job.
27 
28  A TriggerResults object will always be inserted into the event
29  for any schedule. The producer of the TriggerResults EDProduct
30  is always the first module in the endpath. The TriggerResultInserter
31  is given a fixed label of "TriggerResults".
32 
33  Processing of an event happens by pushing the event through the Paths.
34  The scheduler performs the reset() on each of the workers independent
35  of the Path objects.
36 
37  ------------------------
38 
39  About Paths:
40  Paths fit into two categories:
41  1) trigger paths that contribute directly to saved trigger bits
42  2) end paths
43  The StreamSchedule holds these paths in two data structures:
44  1) main path list
45  2) end path list
46 
47  Trigger path processing always precedes endpath processing.
48  The order of the paths from the input configuration is
49  preserved in the main paths list.
50 
51  ------------------------
52 
53  The StreamSchedule uses the TriggerNamesService to get the names of the
54  trigger paths and end paths. When a TriggerResults object is created
55  the results are stored in the same order as the trigger names from
56  TriggerNamesService.
57 
58 */
59 
84 
85 #include <map>
86 #include <memory>
87 #include <set>
88 #include <string>
89 #include <vector>
90 #include <sstream>
91 
92 namespace edm {
93 
94  class ActivityRegistry;
95  class BranchIDListHelper;
96  class EventSetup;
97  class ExceptionCollector;
98  class OutputModuleCommunicator;
99  class ProcessContext;
100  class UnscheduledCallProducer;
101  class WorkerInPath;
102  class ModuleRegistry;
103  class TriggerResultInserter;
104  class PreallocationConfiguration;
105 
106  namespace service {
107  class TriggerNamesService;
108  }
109 
110  namespace {
111  template <typename T>
112  class StreamScheduleSignalSentry {
113  public:
114  StreamScheduleSignalSentry(ActivityRegistry* a, typename T::Context const* context) :
115  a_(a), context_(context), allowThrow_(false) {
116  if (a_) T::preScheduleSignal(a_, context_);
117  }
118  ~StreamScheduleSignalSentry() noexcept(false) {
119  try {
120  if (a_) { T::postScheduleSignal(a_, context_); }
121  } catch(...) {
122  if(allowThrow_) {throw;}
123  }
124  }
125 
126  void allowThrow() {
127  allowThrow_ = true;
128  }
129 
130  private:
131  // We own none of these resources.
132  ActivityRegistry* a_;
133  typename T::Context const* context_;
134  bool allowThrow_;
135  };
136  }
137 
139  public:
140  typedef std::vector<std::string> vstring;
141  typedef std::vector<Path> TrigPaths;
142  typedef std::vector<Path> NonTrigPaths;
143  typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
144  typedef std::shared_ptr<Worker> WorkerPtr;
145  typedef std::vector<Worker*> AllWorkers;
146  typedef std::vector<std::shared_ptr<OutputModuleCommunicator> > AllOutputModuleCommunicators;
147 
148  typedef std::vector<Worker*> Workers;
149 
150  typedef std::vector<WorkerInPath> PathWorkers;
151 
152  StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
153  std::shared_ptr<ModuleRegistry>,
154  ParameterSet& proc_pset,
157  ProductRegistry& pregistry,
158  BranchIDListHelper& branchIDListHelper,
160  std::shared_ptr<ActivityRegistry> areg,
161  std::shared_ptr<ProcessConfiguration> processConfiguration,
162  bool allowEarlyDelete,
164  ProcessContext const* processContext);
165 
166  StreamSchedule(StreamSchedule const&) = delete;
167 
168  template <typename T>
169  void processOneEvent(typename T::MyPrincipal& principal,
170  EventSetup const& eventSetup,
171  bool cleaningUpAfterException = false);
172 
173  template <typename T>
174  void processOneStream(typename T::MyPrincipal& principal,
175  EventSetup const& eventSetup,
176  bool cleaningUpAfterException = false);
177 
178  void beginStream();
179  void endStream();
180 
181  StreamID streamID() const { return streamID_; }
182 
185 
189  std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
190 
192  void availablePaths(std::vector<std::string>& oLabelsToFill) const;
193 
197  void triggerPaths(std::vector<std::string>& oLabelsToFill) const;
198 
200  void endPaths(std::vector<std::string>& oLabelsToFill) const;
201 
203  void modulesInPath(std::string const& iPathLabel,
204  std::vector<std::string>& oLabelsToFill) const;
205 
206  void moduleDescriptionsInPath(std::string const& iPathLabel,
207  std::vector<ModuleDescription const*>& descriptions,
208  unsigned int hint) const;
209 
210  void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
211  std::vector<ModuleDescription const*>& descriptions,
212  unsigned int hint) const;
213 
217  int totalEvents() const {
218  return total_events_;
219  }
220 
223  int totalEventsPassed() const {
224  return total_passed_;
225  }
226 
229  int totalEventsFailed() const {
230  return totalEvents() - totalEventsPassed();
231  }
232 
235  void enableEndPaths(bool active);
236 
239  bool endPathsEnabled() const;
240 
243  void getTriggerReport(TriggerReport& rep) const;
244 
246  void clearCounters();
247 
249  void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
250 
252  AllWorkers const& allWorkers() const {
253  return workerManager_.allWorkers();
254  }
255 
256  unsigned int numberOfUnscheduledModules() const {
258  }
259 
260  private:
261  //Sentry class to only send a signal if an
262  // exception occurs. An exception is identified
263  // by the destructor being called without first
264  // calling completedSuccessfully().
266  public:
268  reg_(iReg),
269  context_(iContext){}
271  if(reg_) {
273  }
274  }
276  reg_ = nullptr;
277  }
278  private:
281  };
282 
285  return workerManager_.actionTable();
286  }
287 
288 
289  void resetAll();
290 
291  template <typename T>
292  bool runTriggerPaths(typename T::MyPrincipal&, EventSetup const&, typename T::Context const*);
293 
294  template <typename T>
295  void runEndPaths(typename T::MyPrincipal&, EventSetup const&, typename T::Context const*);
296 
297  void reportSkipped(EventPrincipal const& ep) const;
298 
299  void fillWorkers(ParameterSet& proc_pset,
302  std::shared_ptr<ProcessConfiguration const> processConfiguration,
303  std::string const& name, bool ignoreFilters, PathWorkers& out,
304  vstring* labelsOnPaths);
305  void fillTrigPath(ParameterSet& proc_pset,
308  std::shared_ptr<ProcessConfiguration const> processConfiguration,
309  int bitpos, std::string const& name, TrigResPtr,
310  vstring* labelsOnTriggerPaths);
311  void fillEndPath(ParameterSet& proc_pset,
314  std::shared_ptr<ProcessConfiguration const> processConfiguration,
315  int bitpos, std::string const& name);
316 
317  void addToAllWorkers(Worker* w);
318 
319  void resetEarlyDelete();
321  edm::ParameterSet const& opts,
322  edm::ProductRegistry const& preg,
323  bool allowEarlyDelete);
324 
326  std::shared_ptr<ActivityRegistry> actReg_;
327 
330 
332 
336  std::vector<int> empty_trig_paths_;
338 
339  //For each branch that has been marked for early deletion
340  // keep track of how many modules are left that read this data but have
341  // not yet been run in this event
342  std::vector<std::pair<BranchID,unsigned int>> earlyDeleteBranchToCount_;
343  //NOTE the following is effectively internal data for each EarlyDeleteHelper
344  // but putting it into one vector makes for better allocation as well as
345  // faster iteration when used to reset the earlyDeleteBranchToCount_
346  // Each EarlyDeleteHelper hold a begin and end range into this vector. The values
347  // of this vector correspond to indexes into earlyDeleteBranchToCount_ so
348  // tell which EarlyDeleteHelper is associated with which BranchIDs.
349  std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
350  //There is one EarlyDeleteHelper per Module which are reading data that
351  // has been marked for early deletion
352  std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
353 
357 
360  volatile bool endpathsAreActive_;
361  };
362 
363  void
364  inline
366  Service<JobReport> reportSvc;
367  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
368  }
369 
370  template <typename T>
371  void StreamSchedule::processOneEvent(typename T::MyPrincipal& ep,
372  EventSetup const& es,
373  bool cleaningUpAfterException) {
374  this->resetAll();
375  for (int empty_trig_path : empty_trig_paths_) {
376  results_->at(empty_trig_path) = HLTPathStatus(hlt::Pass, 0);
377  }
378 
379  T::setStreamContext(streamContext_, ep);
380  StreamScheduleSignalSentry<T> sentry(actReg_.get(), &streamContext_);
381 
382  SendTerminationSignalIfException terminationSentry(actReg_.get(), &streamContext_);
383  // This call takes care of the unscheduled processing.
384  workerManager_.processOneOccurrence<T>(ep, es, streamID_, &streamContext_, &streamContext_, cleaningUpAfterException);
385 
386  ++total_events_;
387  try {
388  convertException::wrap([&]() {
389  try {
390  if (runTriggerPaths<T>(ep, es, &streamContext_)) {
391  ++total_passed_;
392  }
393  }
394  catch(cms::Exception& e) {
398  if (action == exception_actions::SkipEvent) {
399  edm::printCmsExceptionWarning("SkipEvent", e);
400  } else {
401  throw;
402  }
403  }
404 
405  try {
406  ParentContext parentContext(&streamContext_);
407  if (results_inserter_.get()) results_inserter_->doWork<T>(ep, es, streamID_, parentContext, &streamContext_);
408  }
409  catch (cms::Exception & ex) {
410  if (T::isEvent_) {
411  ex.addContext("Calling produce method for module TriggerResultInserter");
412  }
413  std::ostringstream ost;
414  ost << "Processing " << ep.id();
415  ex.addContext(ost.str());
416  throw;
417  }
418 
419  if (endpathsAreActive_) runEndPaths<T>(ep, es, &streamContext_);
421  });
422  }
423  catch(cms::Exception& ex) {
424  if (ex.context().empty()) {
425  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
426  } else {
427  addContextAndPrintException("", ex, cleaningUpAfterException);
428  }
429  throw;
430  }
431  terminationSentry.completedSuccessfully();
432 
433  //If we got here no other exception has happened so we can propogate any Service related exceptions
434  sentry.allowThrow();
435  }
436 
437  template <typename T>
438  void StreamSchedule::processOneStream(typename T::MyPrincipal& ep,
439  EventSetup const& es,
440  bool cleaningUpAfterException) {
441  this->resetAll();
442 
443  T::setStreamContext(streamContext_, ep);
444  StreamScheduleSignalSentry<T> sentry(actReg_.get(), &streamContext_);
445 
446  SendTerminationSignalIfException terminationSentry(actReg_.get(), &streamContext_);
447 
448  // This call takes care of the unscheduled processing.
449  workerManager_.processOneOccurrence<T>(ep, es, streamID_, &streamContext_, &streamContext_, cleaningUpAfterException);
450 
451  try {
452  convertException::wrap([&]() {
453  runTriggerPaths<T>(ep, es, &streamContext_);
454 
455  if (endpathsAreActive_) runEndPaths<T>(ep, es, &streamContext_);
456  });
457  }
458  catch(cms::Exception& ex) {
459  if (ex.context().empty()) {
460  addContextAndPrintException("Calling function StreamSchedule::processOneStream", ex, cleaningUpAfterException);
461  } else {
462  addContextAndPrintException("", ex, cleaningUpAfterException);
463  }
464  throw;
465  }
466  terminationSentry.completedSuccessfully();
467 
468  //If we got here no other exception has happened so we can propogate any Service related exceptions
469  sentry.allowThrow();
470  }
471 
472  template <typename T>
473  bool
474  StreamSchedule::runTriggerPaths(typename T::MyPrincipal& ep, EventSetup const& es, typename T::Context const* context) {
475  for(auto& p : trig_paths_) {
476  p.processOneOccurrence<T>(ep, es, streamID_, context);
477  }
478  return results_->accept();
479  }
480 
481  template <typename T>
482  void
483  StreamSchedule::runEndPaths(typename T::MyPrincipal& ep, EventSetup const& es, typename T::Context const* context) {
484  // Note there is no state-checking safety controlling the
485  // activation/deactivation of endpaths.
486  for(auto& p : end_paths_) {
487  p.processOneOccurrence<T>(ep, es, streamID_, context);
488  }
489  }
490 }
491 
492 #endif
RunNumber_t run() const
Definition: EventID.h:39
void triggerPaths(std::vector< std::string > &oLabelsToFill) const
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
EventNumber_t event() const
Definition: EventID.h:41
string rep
Definition: cuy.py:1188
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void runEndPaths(typename T::MyPrincipal &, EventSetup const &, typename T::Context const *)
std::vector< Worker * > Workers
std::vector< int > empty_trig_paths_
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name)
const double w
Definition: UKUtility.cc:23
int totalEventsFailed() const
unsigned int numberOfUnscheduledModules() const
std::vector< Path > NonTrigPaths
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
std::shared_ptr< HLTGlobalStatus > TrigResPtr
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
std::vector< std::pair< BranchID, unsigned int > > earlyDeleteBranchToCount_
void processOneOccurrence(typename T::MyPrincipal &principal, EventSetup const &eventSetup, StreamID streamID, typename T::Context const *topContext, U const *context, bool cleaningUpAfterException=false)
Definition: WorkerManager.h:91
volatile bool endpathsAreActive_
assert(m_qm.get())
EventID const & id() const
int totalEvents() const
#define noexcept
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
processConfiguration
Definition: Schedule.cc:369
std::string const & category() const
Definition: Exception.cc:183
void addToAllWorkers(Worker *w)
exception_actions::ActionCodes find(const std::string &category) const
void reportSkipped(EventPrincipal const &ep) const
bool runTriggerPaths(typename T::MyPrincipal &, EventSetup const &, typename T::Context const *)
ExceptionToActionTable const & actionTable() const
returns the action table
void processOneEvent(typename T::MyPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
std::shared_ptr< Worker > WorkerPtr
actions
Definition: Schedule.cc:369
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
int totalEventsPassed() const
void getTriggerReport(TriggerReport &rep) const
StreamID streamID() const
PreStreamEarlyTermination preStreamEarlyTerminationSignal_
accept
Definition: HLTenums.h:19
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
SendTerminationSignalIfException(edm::ActivityRegistry *iReg, edm::StreamContext const *iContext)
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, vstring *labelsOnPaths)
StreamContext streamContext_
std::vector< std::string > vstring
std::list< std::string > const & context() const
Definition: Exception.cc:191
std::vector< std::shared_ptr< OutputModuleCommunicator > > AllOutputModuleCommunicators
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e, edm::JobReport *jobRep=0, int rc=-1)
std::vector< Worker * > AllWorkers
vstring empty_trig_path_names_
areg
Definition: Schedule.cc:369
tuple out
Definition: dbtoconf.py:99
void clearCounters()
Clear all the counters in the trigger report.
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:63
WorkerPtr results_inserter_
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
void addContext(std::string const &context)
Definition: Exception.cc:227
bool endPathsEnabled() const
double a
Definition: hdecay.h:121
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, vstring *labelsOnTriggerPaths)
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
void enableEndPaths(bool active)
auto wrap(F iFunc) -> decltype(iFunc())
volatile std::atomic< bool > shutdown_flag false
preg
Definition: Schedule.cc:369
long double T
std::vector< Path > TrigPaths
std::vector< std::string > vstring
Definition: Schedule.cc:353
void processOneStream(typename T::MyPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:67
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
prealloc
Definition: Schedule.cc:369
void endPaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all end paths in the process