CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StreamSchedule.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_StreamSchedule_h
2 #define FWCore_Framework_StreamSchedule_h
3 
4 /*
5  Author: Jim Kowalkowski 28-01-06
6 
7  A class for creating a schedule based on paths in the configuration file.
8  The schedule is maintained as a sequence of paths.
9  After construction, events can be fed to the object and passed through
10  all the modules in the schedule. All accounting about processing
11  of events by modules and paths is contained here or in object held
12  by containment.
13 
14  The trigger results producer and product are generated and managed here.
15  This class also manages endpaths and calls to endjob and beginjob.
16  Endpaths are just treated as a simple list of modules that need to
17  do processing of the event and do not participate in trigger path
18  activities.
19 
20  This class requires the high-level process pset. It uses @process_name.
21  If the high-level pset contains an "options" pset, then the
22  following optional parameter can be present:
23  bool wantSummary = true/false # default false
24 
25  wantSummary indicates whether or not the pass/fail/error stats
26  for modules and paths should be printed at the end-of-job.
27 
28  A TriggerResults object will always be inserted into the event
29  for any schedule. The producer of the TriggerResults EDProduct
30  is always the first module in the endpath. The TriggerResultInserter
31  is given a fixed label of "TriggerResults".
32 
33  Processing of an event happens by pushing the event through the Paths.
34  The scheduler performs the reset() on each of the workers independent
35  of the Path objects.
36 
37  ------------------------
38 
39  About Paths:
40  Paths fit into two categories:
41  1) trigger paths that contribute directly to saved trigger bits
42  2) end paths
43  The StreamSchedule holds these paths in two data structures:
44  1) main path list
45  2) end path list
46 
47  Trigger path processing always precedes endpath processing.
48  The order of the paths from the input configuration is
49  preserved in the main paths list.
50 
51  ------------------------
52 
53  The StreamSchedule uses the TriggerNamesService to get the names of the
54  trigger paths and end paths. When a TriggerResults object is created
55  the results are stored in the same order as the trigger names from
56  TriggerNamesService.
57 
58 */
59 
85 
86 #include <map>
87 #include <memory>
88 #include <set>
89 #include <string>
90 #include <vector>
91 #include <sstream>
92 #include <atomic>
93 
94 namespace edm {
95 
96  class ActivityRegistry;
97  class BranchIDListHelper;
98  class EventSetup;
99  class ExceptionCollector;
100  class OutputModuleCommunicator;
101  class ProcessContext;
103  class WorkerInPath;
104  class ModuleRegistry;
105  class TriggerResultInserter;
106  class PreallocationConfiguration;
107  class WaitingTaskHolder;
108 
109  namespace service {
110  class TriggerNamesService;
111  }
112 
113  namespace {
114  template <typename T>
115  class StreamScheduleSignalSentry {
116  public:
117  StreamScheduleSignalSentry(ActivityRegistry* a, typename T::Context const* context) :
118  a_(a), context_(context), allowThrow_(false) {
119  if (a_) T::preScheduleSignal(a_, context_);
120  }
121  ~StreamScheduleSignalSentry() noexcept(false) {
122  try {
123  if (a_) { T::postScheduleSignal(a_, context_); }
124  } catch(...) {
125  if(allowThrow_) {throw;}
126  }
127  }
128 
129  void allowThrow() {
130  allowThrow_ = true;
131  }
132 
133  private:
134  // We own none of these resources.
135  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
136  typename T::Context const* context_;
137  bool allowThrow_;
138  };
139  }
140 
142  public:
143  typedef std::vector<std::string> vstring;
144  typedef std::vector<Path> TrigPaths;
145  typedef std::vector<Path> NonTrigPaths;
146  typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
147  typedef std::shared_ptr<HLTGlobalStatus const> TrigResConstPtr;
148  typedef std::shared_ptr<Worker> WorkerPtr;
149  typedef std::vector<Worker*> AllWorkers;
150 
151  typedef std::vector<Worker*> Workers;
152 
153  typedef std::vector<WorkerInPath> PathWorkers;
154 
155  StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
156  std::shared_ptr<ModuleRegistry>,
157  ParameterSet& proc_pset,
160  ProductRegistry& pregistry,
161  BranchIDListHelper& branchIDListHelper,
163  std::shared_ptr<ActivityRegistry> areg,
164  std::shared_ptr<ProcessConfiguration> processConfiguration,
165  bool allowEarlyDelete,
167  ProcessContext const* processContext);
168 
169  StreamSchedule(StreamSchedule const&) = delete;
170 
172  EventPrincipal& ep,
173  EventSetup const& es);
174 
175  template <typename T>
176  void processOneStream(typename T::MyPrincipal& principal,
177  EventSetup const& eventSetup,
178  bool cleaningUpAfterException = false);
179 
180  void beginStream();
181  void endStream();
182 
183  StreamID streamID() const { return streamID_; }
184 
187 
191  std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
192 
194  void availablePaths(std::vector<std::string>& oLabelsToFill) const;
195 
199  void triggerPaths(std::vector<std::string>& oLabelsToFill) const;
200 
202  void endPaths(std::vector<std::string>& oLabelsToFill) const;
203 
205  void modulesInPath(std::string const& iPathLabel,
206  std::vector<std::string>& oLabelsToFill) const;
207 
208  void moduleDescriptionsInPath(std::string const& iPathLabel,
209  std::vector<ModuleDescription const*>& descriptions,
210  unsigned int hint) const;
211 
212  void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
213  std::vector<ModuleDescription const*>& descriptions,
214  unsigned int hint) const;
215 
219  int totalEvents() const {
220  return total_events_;
221  }
222 
225  int totalEventsPassed() const {
226  return total_passed_;
227  }
228 
231  int totalEventsFailed() const {
232  return totalEvents() - totalEventsPassed();
233  }
234 
237  void enableEndPaths(bool active);
238 
241  bool endPathsEnabled() const;
242 
245  void getTriggerReport(TriggerReport& rep) const;
246 
248  void clearCounters();
249 
251  void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
252 
254  AllWorkers const& allWorkers() const {
255  return workerManager_.allWorkers();
256  }
257 
258  unsigned int numberOfUnscheduledModules() const {
260  }
261 
262  StreamContext const& context() const { return streamContext_;}
263  private:
264  //Sentry class to only send a signal if an
265  // exception occurs. An exception is identified
266  // by the destructor being called without first
267  // calling completedSuccessfully().
269  public:
271  reg_(iReg),
272  context_(iContext){}
274  if(reg_) {
276  }
277  }
279  reg_ = nullptr;
280  }
281  private:
282  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
284  };
285 
288  return workerManager_.actionTable();
289  }
290 
291 
292  void resetAll();
293 
294  void finishedPaths(std::exception_ptr, WaitingTaskHolder,
295  EventPrincipal& ep, EventSetup const& es);
296  std::exception_ptr finishProcessOneEvent(std::exception_ptr);
297 
298  template <typename T>
299  bool runTriggerPaths(typename T::MyPrincipal const&, EventSetup const&, typename T::Context const*);
300 
301  template <typename T>
302  void runEndPaths(typename T::MyPrincipal const&, EventSetup const&, typename T::Context const*);
303 
304  void reportSkipped(EventPrincipal const& ep) const;
305 
306  void fillWorkers(ParameterSet& proc_pset,
309  std::shared_ptr<ProcessConfiguration const> processConfiguration,
310  std::string const& name, bool ignoreFilters, PathWorkers& out,
311  vstring* labelsOnPaths);
312  void fillTrigPath(ParameterSet& proc_pset,
315  std::shared_ptr<ProcessConfiguration const> processConfiguration,
316  int bitpos, std::string const& name, TrigResPtr,
317  vstring* labelsOnTriggerPaths);
318  void fillEndPath(ParameterSet& proc_pset,
321  std::shared_ptr<ProcessConfiguration const> processConfiguration,
322  int bitpos, std::string const& name);
323 
324  void addToAllWorkers(Worker* w);
325 
326  void resetEarlyDelete();
328  edm::ParameterSet const& opts,
329  edm::ProductRegistry const& preg,
330  bool allowEarlyDelete);
331 
334 
336  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
337 
340 
342 
346  std::vector<int> empty_trig_paths_;
348 
349  //For each branch that has been marked for early deletion
350  // keep track of how many modules are left that read this data but have
351  // not yet been run in this event
352  std::vector<BranchToCount> earlyDeleteBranchToCount_;
353  //NOTE the following is effectively internal data for each EarlyDeleteHelper
354  // but putting it into one vector makes for better allocation as well as
355  // faster iteration when used to reset the earlyDeleteBranchToCount_
356  // Each EarlyDeleteHelper hold a begin and end range into this vector. The values
357  // of this vector correspond to indexes into earlyDeleteBranchToCount_ so
358  // tell which EarlyDeleteHelper is associated with which BranchIDs.
359  std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
360  //There is one EarlyDeleteHelper per Module which are reading data that
361  // has been marked for early deletion
362  std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
363 
367 
370  volatile bool endpathsAreActive_;
371  std::atomic<bool> skippingEvent_;
372  };
373 
374  void
375  inline
377  Service<JobReport> reportSvc;
378  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
379  }
380 
381  template <typename T>
382  void StreamSchedule::processOneStream(typename T::MyPrincipal& ep,
383  EventSetup const& es,
384  bool cleaningUpAfterException) {
385  this->resetAll();
386 
387  T::setStreamContext(streamContext_, ep);
388  StreamScheduleSignalSentry<T> sentry(actReg_.get(), &streamContext_);
389 
390  SendTerminationSignalIfException terminationSentry(actReg_.get(), &streamContext_);
391 
392  // This call takes care of the unscheduled processing.
393  workerManager_.processOneOccurrence<T>(ep, es, streamID_, &streamContext_, &streamContext_, cleaningUpAfterException);
394 
395  try {
396  convertException::wrap([&]() {
397  runTriggerPaths<T>(ep, es, &streamContext_);
398 
399  if (endpathsAreActive_) runEndPaths<T>(ep, es, &streamContext_);
400  });
401  }
402  catch(cms::Exception& ex) {
403  if (ex.context().empty()) {
404  addContextAndPrintException("Calling function StreamSchedule::processOneStream", ex, cleaningUpAfterException);
405  } else {
406  addContextAndPrintException("", ex, cleaningUpAfterException);
407  }
408  throw;
409  }
410  terminationSentry.completedSuccessfully();
411 
412  //If we got here no other exception has happened so we can propogate any Service related exceptions
413  sentry.allowThrow();
414  }
415 
416  template <typename T>
417  bool
418  StreamSchedule::runTriggerPaths(typename T::MyPrincipal const& ep, EventSetup const& es, typename T::Context const* context) {
419  for(auto& p : trig_paths_) {
420  p.processOneOccurrence<T>(ep, es, streamID_, context);
421  }
422  return results_->accept();
423  }
424 
425  template <typename T>
426  void
427  StreamSchedule::runEndPaths(typename T::MyPrincipal const& ep, EventSetup const& es, typename T::Context const* context) {
428  // Note there is no state-checking safety controlling the
429  // activation/deactivation of endpaths.
430  for(auto& p : end_paths_) {
431  p.processOneOccurrence<T>(ep, es, streamID_, context);
432  }
433  }
434 }
435 
436 #endif
RunNumber_t run() const
Definition: EventID.h:39
void triggerPaths(std::vector< std::string > &oLabelsToFill) const
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
EventNumber_t event() const
Definition: EventID.h:41
string rep
Definition: cuy.py:1188
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
std::vector< Worker * > Workers
std::vector< int > empty_trig_paths_
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name)
const double w
Definition: UKUtility.cc:23
int totalEventsFailed() const
unsigned int numberOfUnscheduledModules() const
std::vector< Path > NonTrigPaths
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
std::shared_ptr< HLTGlobalStatus > TrigResPtr
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void processOneOccurrence(typename T::MyPrincipal &principal, EventSetup const &eventSetup, StreamID streamID, typename T::Context const *topContext, U const *context, bool cleaningUpAfterException=false)
Definition: WorkerManager.h:91
volatile bool endpathsAreActive_
std::vector< BranchToCount > earlyDeleteBranchToCount_
EventID const & id() const
int totalEvents() const
#define noexcept
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
processConfiguration
Definition: Schedule.cc:384
void addToAllWorkers(Worker *w)
void reportSkipped(EventPrincipal const &ep) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
std::shared_ptr< Worker > WorkerPtr
actions
Definition: Schedule.cc:384
void runEndPaths(typename T::MyPrincipal const &, EventSetup const &, typename T::Context const *)
void processOneEventAsync(WaitingTaskHolder iTask, EventPrincipal &ep, EventSetup const &es)
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
int totalEventsPassed() const
void getTriggerReport(TriggerReport &rep) const
StreamID streamID() const
PreStreamEarlyTermination preStreamEarlyTerminationSignal_
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
SendTerminationSignalIfException(edm::ActivityRegistry *iReg, edm::StreamContext const *iContext)
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, vstring *labelsOnPaths)
StreamContext streamContext_
std::vector< std::string > vstring
std::list< std::string > const & context() const
Definition: Exception.cc:191
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::vector< Worker * > AllWorkers
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
vstring empty_trig_path_names_
areg
Definition: Schedule.cc:384
void clearCounters()
Clear all the counters in the trigger report.
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:66
TrigResPtr & results()
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
edm::propagate_const< TrigResPtr > results_
bool endPathsEnabled() const
std::atomic< bool > skippingEvent_
double a
Definition: hdecay.h:121
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, vstring *labelsOnTriggerPaths)
StreamContext const & context() const
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
void enableEndPaths(bool active)
auto wrap(F iFunc) -> decltype(iFunc())
volatile std::atomic< bool > shutdown_flag false
void finishedPaths(std::exception_ptr, WaitingTaskHolder, EventPrincipal &ep, EventSetup const &es)
bool runTriggerPaths(typename T::MyPrincipal const &, EventSetup const &, typename T::Context const *)
preg
Definition: Schedule.cc:384
long double T
TrigResConstPtr results() const
std::vector< Path > TrigPaths
std::vector< std::string > vstring
Definition: Schedule.cc:368
void processOneStream(typename T::MyPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:70
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
prealloc
Definition: Schedule.cc:384
void endPaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all end paths in the process