test
CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StreamSchedule.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_StreamSchedule_h
2 #define FWCore_Framework_StreamSchedule_h
3 
4 /*
5  Author: Jim Kowalkowski 28-01-06
6 
7  A class for creating a schedule based on paths in the configuration file.
8  The schedule is maintained as a sequence of paths.
9  After construction, events can be fed to the object and passed through
10  all the modules in the schedule. All accounting about processing
11  of events by modules and paths is contained here or in object held
12  by containment.
13 
14  The trigger results producer and product are generated and managed here.
15  This class also manages endpaths and calls to endjob and beginjob.
16  Endpaths are just treated as a simple list of modules that need to
17  do processing of the event and do not participate in trigger path
18  activities.
19 
20  This class requires the high-level process pset. It uses @process_name.
21  If the high-level pset contains an "options" pset, then the
22  following optional parameter can be present:
23  bool wantSummary = true/false # default false
24 
25  wantSummary indicates whether or not the pass/fail/error stats
26  for modules and paths should be printed at the end-of-job.
27 
28  A TriggerResults object will always be inserted into the event
29  for any schedule. The producer of the TriggerResults EDProduct
30  is always the first module in the endpath. The TriggerResultInserter
31  is given a fixed label of "TriggerResults".
32 
33  Processing of an event happens by pushing the event through the Paths.
34  The scheduler performs the reset() on each of the workers independent
35  of the Path objects.
36 
37  ------------------------
38 
39  About Paths:
40  Paths fit into two categories:
41  1) trigger paths that contribute directly to saved trigger bits
42  2) end paths
43  The StreamSchedule holds these paths in two data structures:
44  1) main path list
45  2) end path list
46 
47  Trigger path processing always precedes endpath processing.
48  The order of the paths from the input configuration is
49  preserved in the main paths list.
50 
51  ------------------------
52 
53  The StreamSchedule uses the TriggerNamesService to get the names of the
54  trigger paths and end paths. When a TriggerResults object is created
55  the results are stored in the same order as the trigger names from
56  TriggerNamesService.
57 
58 */
59 
85 
86 #include <map>
87 #include <memory>
88 #include <set>
89 #include <string>
90 #include <vector>
91 #include <sstream>
92 #include <atomic>
93 
94 namespace edm {
95 
96  class ActivityRegistry;
97  class BranchIDListHelper;
98  class EventSetup;
99  class ExceptionCollector;
100  class OutputModuleCommunicator;
101  class ProcessContext;
103  class WorkerInPath;
104  class ModuleRegistry;
105  class TriggerResultInserter;
106  class PreallocationConfiguration;
107 
108  namespace service {
109  class TriggerNamesService;
110  }
111 
112  namespace {
113  template <typename T>
114  class StreamScheduleSignalSentry {
115  public:
116  StreamScheduleSignalSentry(ActivityRegistry* a, typename T::Context const* context) :
117  a_(a), context_(context), allowThrow_(false) {
118  if (a_) T::preScheduleSignal(a_, context_);
119  }
120  ~StreamScheduleSignalSentry() noexcept(false) {
121  try {
122  if (a_) { T::postScheduleSignal(a_, context_); }
123  } catch(...) {
124  if(allowThrow_) {throw;}
125  }
126  }
127 
128  void allowThrow() {
129  allowThrow_ = true;
130  }
131 
132  private:
133  // We own none of these resources.
134  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
135  typename T::Context const* context_;
136  bool allowThrow_;
137  };
138  }
139 
141  public:
142  typedef std::vector<std::string> vstring;
143  typedef std::vector<Path> TrigPaths;
144  typedef std::vector<Path> NonTrigPaths;
145  typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
146  typedef std::shared_ptr<HLTGlobalStatus const> TrigResConstPtr;
147  typedef std::shared_ptr<Worker> WorkerPtr;
148  typedef std::vector<Worker*> AllWorkers;
149 
150  typedef std::vector<Worker*> Workers;
151 
152  typedef std::vector<WorkerInPath> PathWorkers;
153 
154  StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
155  std::shared_ptr<ModuleRegistry>,
156  ParameterSet& proc_pset,
159  ProductRegistry& pregistry,
160  BranchIDListHelper& branchIDListHelper,
162  std::shared_ptr<ActivityRegistry> areg,
163  std::shared_ptr<ProcessConfiguration> processConfiguration,
164  bool allowEarlyDelete,
166  ProcessContext const* processContext);
167 
168  StreamSchedule(StreamSchedule const&) = delete;
169 
171  EventSetup const& eventSetup,
172  bool cleaningUpAfterException = false);
173 
174  template <typename T>
175  void processOneStream(typename T::MyPrincipal& principal,
176  EventSetup const& eventSetup,
177  bool cleaningUpAfterException = false);
178 
179  void beginStream();
180  void endStream();
181 
182  StreamID streamID() const { return streamID_; }
183 
186 
190  std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
191 
193  void availablePaths(std::vector<std::string>& oLabelsToFill) const;
194 
198  void triggerPaths(std::vector<std::string>& oLabelsToFill) const;
199 
201  void endPaths(std::vector<std::string>& oLabelsToFill) const;
202 
204  void modulesInPath(std::string const& iPathLabel,
205  std::vector<std::string>& oLabelsToFill) const;
206 
207  void moduleDescriptionsInPath(std::string const& iPathLabel,
208  std::vector<ModuleDescription const*>& descriptions,
209  unsigned int hint) const;
210 
211  void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
212  std::vector<ModuleDescription const*>& descriptions,
213  unsigned int hint) const;
214 
218  int totalEvents() const {
219  return total_events_;
220  }
221 
224  int totalEventsPassed() const {
225  return total_passed_;
226  }
227 
230  int totalEventsFailed() const {
231  return totalEvents() - totalEventsPassed();
232  }
233 
236  void enableEndPaths(bool active);
237 
240  bool endPathsEnabled() const;
241 
244  void getTriggerReport(TriggerReport& rep) const;
245 
247  void clearCounters();
248 
250  void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
251 
253  AllWorkers const& allWorkers() const {
254  return workerManager_.allWorkers();
255  }
256 
257  unsigned int numberOfUnscheduledModules() const {
259  }
260 
261  private:
262  //Sentry class to only send a signal if an
263  // exception occurs. An exception is identified
264  // by the destructor being called without first
265  // calling completedSuccessfully().
267  public:
269  reg_(iReg),
270  context_(iContext){}
272  if(reg_) {
274  }
275  }
277  reg_ = nullptr;
278  }
279  private:
280  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
282  };
283 
286  return workerManager_.actionTable();
287  }
288 
289 
290  void resetAll();
291 
292  template <typename T>
293  bool runTriggerPaths(typename T::MyPrincipal const&, EventSetup const&, typename T::Context const*);
294 
295  template <typename T>
296  void runEndPaths(typename T::MyPrincipal const&, EventSetup const&, typename T::Context const*);
297 
298  void reportSkipped(EventPrincipal const& ep) const;
299 
300  void fillWorkers(ParameterSet& proc_pset,
303  std::shared_ptr<ProcessConfiguration const> processConfiguration,
304  std::string const& name, bool ignoreFilters, PathWorkers& out,
305  vstring* labelsOnPaths);
306  void fillTrigPath(ParameterSet& proc_pset,
309  std::shared_ptr<ProcessConfiguration const> processConfiguration,
310  int bitpos, std::string const& name, TrigResPtr,
311  vstring* labelsOnTriggerPaths);
312  void fillEndPath(ParameterSet& proc_pset,
315  std::shared_ptr<ProcessConfiguration const> processConfiguration,
316  int bitpos, std::string const& name);
317 
318  void addToAllWorkers(Worker* w);
319 
320  void resetEarlyDelete();
322  edm::ParameterSet const& opts,
323  edm::ProductRegistry const& preg,
324  bool allowEarlyDelete);
325 
328 
330  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
331 
334 
336 
340  std::vector<int> empty_trig_paths_;
342 
343  //For each branch that has been marked for early deletion
344  // keep track of how many modules are left that read this data but have
345  // not yet been run in this event
346  std::vector<BranchToCount> earlyDeleteBranchToCount_;
347  //NOTE the following is effectively internal data for each EarlyDeleteHelper
348  // but putting it into one vector makes for better allocation as well as
349  // faster iteration when used to reset the earlyDeleteBranchToCount_
350  // Each EarlyDeleteHelper hold a begin and end range into this vector. The values
351  // of this vector correspond to indexes into earlyDeleteBranchToCount_ so
352  // tell which EarlyDeleteHelper is associated with which BranchIDs.
353  std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
354  //There is one EarlyDeleteHelper per Module which are reading data that
355  // has been marked for early deletion
356  std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
357 
361 
364  volatile bool endpathsAreActive_;
365  std::atomic<bool> skippingEvent_;
366  };
367 
368  void
369  inline
371  Service<JobReport> reportSvc;
372  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
373  }
374 
375  template <typename T>
376  void StreamSchedule::processOneStream(typename T::MyPrincipal& ep,
377  EventSetup const& es,
378  bool cleaningUpAfterException) {
379  this->resetAll();
380 
381  T::setStreamContext(streamContext_, ep);
382  StreamScheduleSignalSentry<T> sentry(actReg_.get(), &streamContext_);
383 
384  SendTerminationSignalIfException terminationSentry(actReg_.get(), &streamContext_);
385 
386  // This call takes care of the unscheduled processing.
387  workerManager_.processOneOccurrence<T>(ep, es, streamID_, &streamContext_, &streamContext_, cleaningUpAfterException);
388 
389  try {
390  convertException::wrap([&]() {
391  runTriggerPaths<T>(ep, es, &streamContext_);
392 
393  if (endpathsAreActive_) runEndPaths<T>(ep, es, &streamContext_);
394  });
395  }
396  catch(cms::Exception& ex) {
397  if (ex.context().empty()) {
398  addContextAndPrintException("Calling function StreamSchedule::processOneStream", ex, cleaningUpAfterException);
399  } else {
400  addContextAndPrintException("", ex, cleaningUpAfterException);
401  }
402  throw;
403  }
404  terminationSentry.completedSuccessfully();
405 
406  //If we got here no other exception has happened so we can propogate any Service related exceptions
407  sentry.allowThrow();
408  }
409 
410  template <typename T>
411  bool
412  StreamSchedule::runTriggerPaths(typename T::MyPrincipal const& ep, EventSetup const& es, typename T::Context const* context) {
413  for(auto& p : trig_paths_) {
414  p.processOneOccurrence<T>(ep, es, streamID_, context);
415  }
416  return results_->accept();
417  }
418 
419  template <typename T>
420  void
421  StreamSchedule::runEndPaths(typename T::MyPrincipal const& ep, EventSetup const& es, typename T::Context const* context) {
422  // Note there is no state-checking safety controlling the
423  // activation/deactivation of endpaths.
424  for(auto& p : end_paths_) {
425  p.processOneOccurrence<T>(ep, es, streamID_, context);
426  }
427  }
428 }
429 
430 #endif
RunNumber_t run() const
Definition: EventID.h:39
void triggerPaths(std::vector< std::string > &oLabelsToFill) const
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
EventNumber_t event() const
Definition: EventID.h:41
string rep
Definition: cuy.py:1188
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
std::vector< Worker * > Workers
std::vector< int > empty_trig_paths_
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name)
const double w
Definition: UKUtility.cc:23
int totalEventsFailed() const
unsigned int numberOfUnscheduledModules() const
std::vector< Path > NonTrigPaths
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
std::shared_ptr< HLTGlobalStatus > TrigResPtr
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void processOneOccurrence(typename T::MyPrincipal &principal, EventSetup const &eventSetup, StreamID streamID, typename T::Context const *topContext, U const *context, bool cleaningUpAfterException=false)
Definition: WorkerManager.h:90
volatile bool endpathsAreActive_
std::vector< BranchToCount > earlyDeleteBranchToCount_
EventID const & id() const
int totalEvents() const
#define noexcept
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
processConfiguration
Definition: Schedule.cc:383
void addToAllWorkers(Worker *w)
void reportSkipped(EventPrincipal const &ep) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
std::shared_ptr< Worker > WorkerPtr
actions
Definition: Schedule.cc:383
void runEndPaths(typename T::MyPrincipal const &, EventSetup const &, typename T::Context const *)
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
void processOneEvent(EventPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
std::vector< WorkerInPath > PathWorkers
int totalEventsPassed() const
void getTriggerReport(TriggerReport &rep) const
StreamID streamID() const
PreStreamEarlyTermination preStreamEarlyTerminationSignal_
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
SendTerminationSignalIfException(edm::ActivityRegistry *iReg, edm::StreamContext const *iContext)
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, vstring *labelsOnPaths)
StreamContext streamContext_
std::vector< std::string > vstring
std::list< std::string > const & context() const
Definition: Exception.cc:191
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::vector< Worker * > AllWorkers
vstring empty_trig_path_names_
areg
Definition: Schedule.cc:383
void clearCounters()
Clear all the counters in the trigger report.
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:64
TrigResPtr & results()
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
edm::propagate_const< TrigResPtr > results_
bool endPathsEnabled() const
std::atomic< bool > skippingEvent_
double a
Definition: hdecay.h:121
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, vstring *labelsOnTriggerPaths)
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
void enableEndPaths(bool active)
auto wrap(F iFunc) -> decltype(iFunc())
volatile std::atomic< bool > shutdown_flag false
bool runTriggerPaths(typename T::MyPrincipal const &, EventSetup const &, typename T::Context const *)
preg
Definition: Schedule.cc:383
long double T
TrigResConstPtr results() const
std::vector< Path > TrigPaths
std::vector< std::string > vstring
Definition: Schedule.cc:367
void processOneStream(typename T::MyPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:68
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
prealloc
Definition: Schedule.cc:383
void endPaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all end paths in the process