CMS 3D CMS Logo

StreamSchedule.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_StreamSchedule_h
2 #define FWCore_Framework_StreamSchedule_h
3 
4 /*
5  Author: Jim Kowalkowski 28-01-06
6 
7  A class for creating a schedule based on paths in the configuration file.
8  The schedule is maintained as a sequence of paths.
9  After construction, events can be fed to the object and passed through
10  all the modules in the schedule. All accounting about processing
11  of events by modules and paths is contained here or in object held
12  by containment.
13 
14  The trigger results producer and product are generated and managed here.
15  This class also manages endpaths and calls to endjob and beginjob.
16  Endpaths are just treated as a simple list of modules that need to
17  do processing of the event and do not participate in trigger path
18  activities.
19 
20  This class requires the high-level process pset. It uses @process_name.
21  If the high-level pset contains an "options" pset, then the
22  following optional parameter can be present:
23  bool wantSummary = true/false # default false
24 
25  wantSummary indicates whether or not the pass/fail/error stats
26  for modules and paths should be printed at the end-of-job.
27 
28  A TriggerResults object will always be inserted into the event
29  for any schedule. The producer of the TriggerResults EDProduct
30  is always the first module in the endpath. The TriggerResultInserter
31  is given a fixed label of "TriggerResults".
32 
33  Processing of an event happens by pushing the event through the Paths.
34  The scheduler performs the reset() on each of the workers independent
35  of the Path objects.
36 
37  ------------------------
38 
39  About Paths:
40  Paths fit into two categories:
41  1) trigger paths that contribute directly to saved trigger bits
42  2) end paths
43  The StreamSchedule holds these paths in two data structures:
44  1) main path list
45  2) end path list
46 
47  Trigger path processing always precedes endpath processing.
48  The order of the paths from the input configuration is
49  preserved in the main paths list.
50 
51  ------------------------
52 
53  The StreamSchedule uses the TriggerNamesService to get the names of the
54  trigger paths and end paths. When a TriggerResults object is created
55  the results are stored in the same order as the trigger names from
56  TriggerNamesService.
57 
58 */
59 
90 
91 #include <map>
92 #include <memory>
93 #include <set>
94 #include <string>
95 #include <vector>
96 #include <sstream>
97 #include <atomic>
98 #include <unordered_set>
99 
100 namespace edm {
101 
102  class ActivityRegistry;
103  class BranchIDListHelper;
104  class ExceptionCollector;
105  class ExceptionToActionTable;
106  class OutputModuleCommunicator;
107  class ProcessContext;
109  class WorkerInPath;
110  class ModuleRegistry;
111  class TriggerResultInserter;
112  class PathStatusInserter;
113  class EndPathStatusInserter;
114  class PreallocationConfiguration;
115  class WaitingTaskHolder;
116 
117  namespace service {
118  class TriggerNamesService;
119  }
120 
121  namespace {
122  template <typename T>
123  class StreamScheduleSignalSentry {
124  public:
125  StreamScheduleSignalSentry(ActivityRegistry* a, typename T::Context const* context)
126  : a_(a), context_(context), allowThrow_(false) {
127  if (a_)
128  T::preScheduleSignal(a_, context_);
129  }
130  ~StreamScheduleSignalSentry() noexcept(false) {
131  // Caught exception is rethrown (when allowed)
132  CMS_SA_ALLOW try {
133  if (a_) {
134  T::postScheduleSignal(a_, context_);
135  }
136  } catch (...) {
137  if (allowThrow_) {
138  throw;
139  }
140  }
141  }
142 
143  void allowThrow() { allowThrow_ = true; }
144 
145  private:
146  // We own none of these resources.
147  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
148  typename T::Context const* context_;
149  bool allowThrow_;
150  };
151  } // namespace
152 
154  public:
155  typedef std::vector<std::string> vstring;
156  typedef std::vector<Path> TrigPaths;
157  typedef std::vector<Path> NonTrigPaths;
158  typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
159  typedef std::shared_ptr<HLTGlobalStatus const> TrigResConstPtr;
160  typedef std::shared_ptr<Worker> WorkerPtr;
161  typedef std::vector<Worker*> AllWorkers;
162 
163  typedef std::vector<Worker*> Workers;
164 
165  typedef std::vector<WorkerInPath> PathWorkers;
166 
167  StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
168  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
169  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
170  std::shared_ptr<ModuleRegistry>,
171  ParameterSet& proc_pset,
172  service::TriggerNamesService const& tns,
173  PreallocationConfiguration const& prealloc,
174  ProductRegistry& pregistry,
175  BranchIDListHelper& branchIDListHelper,
177  std::shared_ptr<ActivityRegistry> areg,
178  std::shared_ptr<ProcessConfiguration> processConfiguration,
180  ProcessContext const* processContext);
181 
182  StreamSchedule(StreamSchedule const&) = delete;
183 
185  WaitingTaskHolder iTask,
187  ServiceToken const& token,
188  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters);
189 
190  template <typename T>
192  typename T::TransitionInfoType& transitionInfo,
193  ServiceToken const& token,
194  bool cleaningUpAfterException = false);
195 
196  void beginStream();
197  void endStream();
198 
199  StreamID streamID() const { return streamID_; }
200 
203 
207  std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
208 
210  void availablePaths(std::vector<std::string>& oLabelsToFill) const;
211 
213  void modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const;
214 
215  void moduleDescriptionsInPath(std::string const& iPathLabel,
216  std::vector<ModuleDescription const*>& descriptions,
217  unsigned int hint) const;
218 
219  void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
220  std::vector<ModuleDescription const*>& descriptions,
221  unsigned int hint) const;
222 
226  int totalEvents() const { return total_events_; }
227 
230  int totalEventsPassed() const { return total_passed_; }
231 
234  int totalEventsFailed() const { return totalEvents() - totalEventsPassed(); }
235 
238  void getTriggerReport(TriggerReport& rep) const;
239 
241  void clearCounters();
242 
244  void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
245 
247  void deleteModule(std::string const& iLabel);
248 
250  std::vector<std::string> const& branchesToDeleteEarly,
251  edm::ProductRegistry const& preg);
252 
254  AllWorkers const& allWorkers() const { return workerManager_.allWorkers(); }
255 
257 
258  StreamContext const& context() const { return streamContext_; }
259 
260  private:
261  //Sentry class to only send a signal if an
262  // exception occurs. An exception is identified
263  // by the destructor being called without first
264  // calling completedSuccessfully().
266  public:
268  : reg_(iReg), context_(iContext) {}
270  if (reg_) {
272  }
273  }
274  void completedSuccessfully() { reg_ = nullptr; }
275 
276  private:
277  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
279  };
280 
283 
284  void resetAll();
285 
286  void finishedPaths(std::atomic<std::exception_ptr*>&, WaitingTaskHolder, EventTransitionInfo&);
287 
288  std::exception_ptr finishProcessOneEvent(std::exception_ptr);
289 
290  void reportSkipped(EventPrincipal const& ep) const;
291 
292  struct AliasInfo {
297  };
298  std::vector<Worker*> tryToPlaceConditionalModules(
299  Worker*,
300  std::unordered_set<std::string>& conditionalModules,
301  std::multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
302  std::multimap<std::string, AliasInfo> const& aliasMap,
303  ParameterSet& proc_pset,
304  ProductRegistry& preg,
305  PreallocationConfiguration const* prealloc,
306  std::shared_ptr<ProcessConfiguration const> processConfiguration);
307  void fillWorkers(ParameterSet& proc_pset,
308  ProductRegistry& preg,
309  PreallocationConfiguration const* prealloc,
310  std::shared_ptr<ProcessConfiguration const> processConfiguration,
311  std::string const& name,
312  bool ignoreFilters,
313  PathWorkers& out,
314  std::vector<std::string> const& endPathNames);
315  void fillTrigPath(ParameterSet& proc_pset,
316  ProductRegistry& preg,
317  PreallocationConfiguration const* prealloc,
318  std::shared_ptr<ProcessConfiguration const> processConfiguration,
319  int bitpos,
320  std::string const& name,
321  TrigResPtr,
322  std::vector<std::string> const& endPathNames);
323  void fillEndPath(ParameterSet& proc_pset,
324  ProductRegistry& preg,
325  PreallocationConfiguration const* prealloc,
326  std::shared_ptr<ProcessConfiguration const> processConfiguration,
327  int bitpos,
328  std::string const& name,
329  std::vector<std::string> const& endPathNames);
330 
331  void addToAllWorkers(Worker* w);
332 
333  void resetEarlyDelete();
334 
337 
339  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
340  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
342 
344  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
345 
347 
349  std::vector<edm::propagate_const<WorkerPtr>> pathStatusInserterWorkers_;
350  std::vector<edm::propagate_const<WorkerPtr>> endPathStatusInserterWorkers_;
351 
354  std::vector<int> empty_trig_paths_;
355  std::vector<int> empty_end_paths_;
356 
357  //For each branch that has been marked for early deletion
358  // keep track of how many modules are left that read this data but have
359  // not yet been run in this event
360  std::vector<BranchToCount> earlyDeleteBranchToCount_;
361  //NOTE the following is effectively internal data for each EarlyDeleteHelper
362  // but putting it into one vector makes for better allocation as well as
363  // faster iteration when used to reset the earlyDeleteBranchToCount_
364  // Each EarlyDeleteHelper hold a begin and end range into this vector. The values
365  // of this vector correspond to indexes into earlyDeleteBranchToCount_ so
366  // tell which EarlyDeleteHelper is associated with which BranchIDs.
367  std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
368  //There is one EarlyDeleteHelper per Module which are reading data that
369  // has been marked for early deletion
370  std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
371 
375 
378  std::atomic<bool> skippingEvent_;
379  };
380 
381  void inline StreamSchedule::reportSkipped(EventPrincipal const& ep) const {
382  Service<JobReport> reportSvc;
383  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
384  }
385 
386  template <typename T>
388  typename T::TransitionInfoType& transitionInfo,
389  ServiceToken const& token,
390  bool cleaningUpAfterException) {
391  auto const& principal = transitionInfo.principal();
392  T::setStreamContext(streamContext_, principal);
393 
394  auto id = principal.id();
395  ServiceWeakToken weakToken = token;
396  auto doneTask = make_waiting_task(
397  [this, iHolder, id, cleaningUpAfterException, weakToken](std::exception_ptr const* iPtr) mutable {
398  std::exception_ptr excpt;
399  if (iPtr) {
400  excpt = *iPtr;
401  //add context information to the exception and print message
402  try {
403  convertException::wrap([&]() { std::rethrow_exception(excpt); });
404  } catch (cms::Exception& ex) {
405  //TODO: should add the transition type info
406  std::ostringstream ost;
407  if (ex.context().empty()) {
408  ost << "Processing " << T::transitionName() << " " << id;
409  }
410  ServiceRegistry::Operate op(weakToken.lock());
411  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
412  excpt = std::current_exception();
413  }
414 
415  ServiceRegistry::Operate op(weakToken.lock());
416  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
417  }
418  // Caught exception is propagated via WaitingTaskHolder
419  CMS_SA_ALLOW try {
420  ServiceRegistry::Operate op(weakToken.lock());
421  T::postScheduleSignal(actReg_.get(), &streamContext_);
422  } catch (...) {
423  if (not excpt) {
424  excpt = std::current_exception();
425  }
426  }
427  iHolder.doneWaiting(excpt);
428  });
429 
430  auto task = make_functor_task(
431  [this, h = WaitingTaskHolder(*iHolder.group(), doneTask), info = transitionInfo, weakToken]() mutable {
432  auto token = weakToken.lock();
434  // Caught exception is propagated via WaitingTaskHolder
435  CMS_SA_ALLOW try {
436  T::preScheduleSignal(actReg_.get(), &streamContext_);
437 
439  } catch (...) {
440  h.doneWaiting(std::current_exception());
441  return;
442  }
443 
444  for (auto& p : end_paths_) {
445  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
446  }
447 
448  for (auto& p : trig_paths_) {
449  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
450  }
451 
453  });
454 
455  if (streamID_.value() == 0) {
456  //Enqueueing will start another thread if there is only
457  // one thread in the job. Having stream == 0 use spawn
458  // avoids starting up another thread when there is only one stream.
459  iHolder.group()->run([task]() {
460  TaskSentry s{task};
461  task->execute();
462  });
463  } else {
464  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
465  arena.enqueue([task]() {
466  TaskSentry s{task};
467  task->execute();
468  });
469  }
470  }
471 } // namespace edm
472 
473 #endif
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
ExceptionToActionTable const & actionTable() const
returns the action table
std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
std::vector< Worker * > Workers
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::vector< int > empty_trig_paths_
roAction_t actions[nactions]
Definition: GenABIO.cc:181
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
std::vector< Path > NonTrigPaths
T w() const
std::shared_ptr< HLTGlobalStatus > TrigResPtr
int totalEventsPassed() const
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
void processOneEventAsync(WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
std::vector< BranchToCount > earlyDeleteBranchToCount_
unsigned int numberOfUnscheduledModules() const
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
void addToAllWorkers(Worker *w)
edm::propagate_const< WorkerPtr > results_inserter_
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr< Worker > WorkerPtr
StreamID streamID() const
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
unsigned int number_of_unscheduled_modules_
int totalEventsFailed() const
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
void processOneOccurrenceAsync(WaitingTaskHolder, typename T::TransitionInfoType &, ServiceToken const &, StreamID, typename T::Context const *topContext, U const *context)
oneapi::tbb::task_group * group() const noexcept
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
void doneWaiting(std::exception_ptr iExcept)
PreStreamEarlyTermination preStreamEarlyTerminationSignal_
SendTerminationSignalIfException(edm::ActivityRegistry *iReg, edm::StreamContext const *iContext)
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
TrigResConstPtr results() const
StreamContext streamContext_
std::vector< std::string > vstring
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, StreamID streamID, ProcessContext const *processContext)
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:85
void finishedPaths(std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
std::vector< Worker * > AllWorkers
rep
Definition: cuy.py:1189
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
void reportSkipped(EventPrincipal const &ep) const
ServiceToken lock() const
Definition: ServiceToken.h:101
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
int totalEvents() const
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:89
StreamContext const & context() const
void clearCounters()
Clear all the counters in the trigger report.
void initializeEarlyDelete(ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, edm::ProductRegistry const &preg)
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
void getTriggerReport(TriggerReport &rep) const
TrigResPtr & results()
void processOneStreamAsync(WaitingTaskHolder iTask, typename T::TransitionInfoType &transitionInfo, ServiceToken const &token, bool cleaningUpAfterException=false)
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
edm::propagate_const< TrigResPtr > results_
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
std::atomic< bool > skippingEvent_
HLT enums.
double a
Definition: hdecay.h:119
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
unsigned int value() const
Definition: StreamID.h:43
auto wrap(F iFunc) -> decltype(iFunc())
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
std::list< std::string > const & context() const
Definition: Exception.cc:147
long double T
std::vector< Path > TrigPaths
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_