CMS 3D CMS Logo

StreamSchedule.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_StreamSchedule_h
2 #define FWCore_Framework_StreamSchedule_h
3 
4 /*
5  Author: Jim Kowalkowski 28-01-06
6 
7  A class for creating a schedule based on paths in the configuration file.
8  The schedule is maintained as a sequence of paths.
9  After construction, events can be fed to the object and passed through
10  all the modules in the schedule. All accounting about processing
11  of events by modules and paths is contained here or in object held
12  by containment.
13 
14  The trigger results producer and product are generated and managed here.
15  This class also manages endpaths and calls to endjob and beginjob.
16  Endpaths are just treated as a simple list of modules that need to
17  do processing of the event and do not participate in trigger path
18  activities.
19 
20  This class requires the high-level process pset. It uses @process_name.
21  If the high-level pset contains an "options" pset, then the
22  following optional parameter can be present:
23  bool wantSummary = true/false # default false
24 
25  wantSummary indicates whether or not the pass/fail/error stats
26  for modules and paths should be printed at the end-of-job.
27 
28  A TriggerResults object will always be inserted into the event
29  for any schedule. The producer of the TriggerResults EDProduct
30  is always the first module in the endpath. The TriggerResultInserter
31  is given a fixed label of "TriggerResults".
32 
33  Processing of an event happens by pushing the event through the Paths.
34  The scheduler performs the reset() on each of the workers independent
35  of the Path objects.
36 
37  ------------------------
38 
39  About Paths:
40  Paths fit into two categories:
41  1) trigger paths that contribute directly to saved trigger bits
42  2) end paths
43  The StreamSchedule holds these paths in two data structures:
44  1) main path list
45  2) end path list
46 
47  Trigger path processing always precedes endpath processing.
48  The order of the paths from the input configuration is
49  preserved in the main paths list.
50 
51  ------------------------
52 
53  The StreamSchedule uses the TriggerNamesService to get the names of the
54  trigger paths and end paths. When a TriggerResults object is created
55  the results are stored in the same order as the trigger names from
56  TriggerNamesService.
57 
58 */
59 
90 
91 #include <map>
92 #include <memory>
93 #include <set>
94 #include <string>
95 #include <vector>
96 #include <sstream>
97 #include <atomic>
98 #include <unordered_set>
99 
100 namespace edm {
101 
102  class ActivityRegistry;
103  class BranchIDListHelper;
104  class ExceptionCollector;
105  class ExceptionToActionTable;
106  class OutputModuleCommunicator;
107  class ProcessContext;
109  class WorkerInPath;
110  class ModuleRegistry;
111  class TriggerResultInserter;
112  class PathStatusInserter;
113  class EndPathStatusInserter;
114  class PreallocationConfiguration;
115  class WaitingTaskHolder;
116 
117  class ConditionalTaskHelper;
118 
119  namespace service {
120  class TriggerNamesService;
121  }
122 
123  namespace {
124  template <typename T>
125  class StreamScheduleSignalSentry {
126  public:
127  StreamScheduleSignalSentry(ActivityRegistry* a, typename T::Context const* context)
128  : a_(a), context_(context), allowThrow_(false) {
129  if (a_)
130  T::preScheduleSignal(a_, context_);
131  }
132  ~StreamScheduleSignalSentry() noexcept(false) {
133  // Caught exception is rethrown (when allowed)
134  CMS_SA_ALLOW try {
135  if (a_) {
136  T::postScheduleSignal(a_, context_);
137  }
138  } catch (...) {
139  if (allowThrow_) {
140  throw;
141  }
142  }
143  }
144 
145  void allowThrow() { allowThrow_ = true; }
146 
147  private:
148  // We own none of these resources.
149  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
150  typename T::Context const* context_;
151  bool allowThrow_;
152  };
153  } // namespace
154 
156  public:
157  typedef std::vector<std::string> vstring;
158  typedef std::vector<Path> TrigPaths;
159  typedef std::vector<Path> NonTrigPaths;
160  typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
161  typedef std::shared_ptr<HLTGlobalStatus const> TrigResConstPtr;
162  typedef std::shared_ptr<Worker> WorkerPtr;
163  typedef std::vector<Worker*> AllWorkers;
164 
165  typedef std::vector<Worker*> Workers;
166 
167  typedef std::vector<WorkerInPath> PathWorkers;
168 
169  StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
170  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
171  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
172  std::shared_ptr<ModuleRegistry>,
173  ParameterSet& proc_pset,
174  service::TriggerNamesService const& tns,
175  PreallocationConfiguration const& prealloc,
176  ProductRegistry& pregistry,
178  std::shared_ptr<ActivityRegistry> areg,
179  std::shared_ptr<ProcessConfiguration const> processConfiguration,
181  ProcessContext const* processContext);
182 
183  StreamSchedule(StreamSchedule const&) = delete;
184 
186  WaitingTaskHolder iTask,
188  ServiceToken const& token,
189  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters);
190 
191  template <typename T>
193  typename T::TransitionInfoType& transitionInfo,
194  ServiceToken const& token,
195  bool cleaningUpAfterException = false);
196 
197  void beginStream();
198  void endStream();
199 
200  StreamID streamID() const { return streamID_; }
201 
204 
208  std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
209 
211  void availablePaths(std::vector<std::string>& oLabelsToFill) const;
212 
214  void modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const;
215 
216  void moduleDescriptionsInPath(std::string const& iPathLabel,
217  std::vector<ModuleDescription const*>& descriptions,
218  unsigned int hint) const;
219 
220  void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
221  std::vector<ModuleDescription const*>& descriptions,
222  unsigned int hint) const;
223 
227  int totalEvents() const { return total_events_; }
228 
231  int totalEventsPassed() const { return total_passed_; }
232 
235  int totalEventsFailed() const { return totalEvents() - totalEventsPassed(); }
236 
239  void getTriggerReport(TriggerReport& rep) const;
240 
242  void clearCounters();
243 
245  void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
246 
248  void deleteModule(std::string const& iLabel);
249 
251  std::vector<std::string> const& branchesToDeleteEarly,
252  edm::ProductRegistry const& preg);
253 
255  AllWorkers const& allWorkers() const { return workerManager_.allWorkers(); }
256 
259 
260  StreamContext const& context() const { return streamContext_; }
261 
262  struct AliasInfo {
267  };
268 
269  private:
270  //Sentry class to only send a signal if an
271  // exception occurs. An exception is identified
272  // by the destructor being called without first
273  // calling completedSuccessfully().
275  public:
277  : reg_(iReg), context_(iContext) {}
279  if (reg_) {
281  }
282  }
283  void completedSuccessfully() { reg_ = nullptr; }
284 
285  private:
286  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
288  };
289 
292 
293  void resetAll();
294 
295  void finishedPaths(std::atomic<std::exception_ptr*>&, WaitingTaskHolder, EventTransitionInfo&);
296 
297  std::exception_ptr finishProcessOneEvent(std::exception_ptr);
298 
299  void reportSkipped(EventPrincipal const& ep) const;
300 
301  std::vector<Worker*> tryToPlaceConditionalModules(
302  Worker*,
303  std::unordered_set<std::string>& conditionalModules,
304  std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
305  std::unordered_multimap<std::string, AliasInfo> const& aliasMap,
306  ParameterSet& proc_pset,
307  ProductRegistry& preg,
308  PreallocationConfiguration const* prealloc,
309  std::shared_ptr<ProcessConfiguration const> processConfiguration);
310  void fillWorkers(ParameterSet& proc_pset,
311  ProductRegistry& preg,
312  PreallocationConfiguration const* prealloc,
313  std::shared_ptr<ProcessConfiguration const> processConfiguration,
314  std::string const& name,
315  bool ignoreFilters,
316  PathWorkers& out,
317  std::vector<std::string> const& endPathNames,
318  ConditionalTaskHelper const& conditionalTaskHelper);
319  void fillTrigPath(ParameterSet& proc_pset,
320  ProductRegistry& preg,
321  PreallocationConfiguration const* prealloc,
322  std::shared_ptr<ProcessConfiguration const> processConfiguration,
323  int bitpos,
324  std::string const& name,
325  TrigResPtr,
326  std::vector<std::string> const& endPathNames,
327  ConditionalTaskHelper const& conditionalTaskHelper);
328  void fillEndPath(ParameterSet& proc_pset,
329  ProductRegistry& preg,
330  PreallocationConfiguration const* prealloc,
331  std::shared_ptr<ProcessConfiguration const> processConfiguration,
332  int bitpos,
333  std::string const& name,
334  std::vector<std::string> const& endPathNames,
335  ConditionalTaskHelper const& conditionalTaskHelper);
336 
337  void addToAllWorkers(Worker* w);
338 
339  void resetEarlyDelete();
340 
343 
345  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
346  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
348 
350  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
351 
353 
355  std::vector<edm::propagate_const<WorkerPtr>> pathStatusInserterWorkers_;
356  std::vector<edm::propagate_const<WorkerPtr>> endPathStatusInserterWorkers_;
357 
360  std::vector<int> empty_trig_paths_;
361  std::vector<int> empty_end_paths_;
362 
363  //For each branch that has been marked for early deletion
364  // keep track of how many modules are left that read this data but have
365  // not yet been run in this event
366  std::vector<BranchToCount> earlyDeleteBranchToCount_;
367  //NOTE the following is effectively internal data for each EarlyDeleteHelper
368  // but putting it into one vector makes for better allocation as well as
369  // faster iteration when used to reset the earlyDeleteBranchToCount_
370  // Each EarlyDeleteHelper hold a begin and end range into this vector. The values
371  // of this vector correspond to indexes into earlyDeleteBranchToCount_ so
372  // tell which EarlyDeleteHelper is associated with which BranchIDs.
373  std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
374  //There is one EarlyDeleteHelper per Module which are reading data that
375  // has been marked for early deletion
376  std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
377 
381 
384  std::atomic<bool> skippingEvent_;
385  };
386 
387  void inline StreamSchedule::reportSkipped(EventPrincipal const& ep) const {
388  Service<JobReport> reportSvc;
389  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
390  }
391 
392  template <typename T>
394  typename T::TransitionInfoType& transitionInfo,
395  ServiceToken const& token,
396  bool cleaningUpAfterException) {
397  auto const& principal = transitionInfo.principal();
398  T::setStreamContext(streamContext_, principal);
399 
400  auto id = principal.id();
401  ServiceWeakToken weakToken = token;
402  auto doneTask = make_waiting_task(
403  [this, iHolder, id, cleaningUpAfterException, weakToken](std::exception_ptr const* iPtr) mutable {
404  std::exception_ptr excpt;
405  if (iPtr) {
406  excpt = *iPtr;
407  //add context information to the exception and print message
408  try {
409  convertException::wrap([&]() { std::rethrow_exception(excpt); });
410  } catch (cms::Exception& ex) {
411  //TODO: should add the transition type info
412  std::ostringstream ost;
413  if (ex.context().empty()) {
414  ost << "Processing " << T::transitionName() << " " << id;
415  }
416  ServiceRegistry::Operate op(weakToken.lock());
417  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
418  excpt = std::current_exception();
419  }
420 
421  ServiceRegistry::Operate op(weakToken.lock());
422  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
423  }
424  // Caught exception is propagated via WaitingTaskHolder
425  CMS_SA_ALLOW try {
426  ServiceRegistry::Operate op(weakToken.lock());
427  T::postScheduleSignal(actReg_.get(), &streamContext_);
428  } catch (...) {
429  if (not excpt) {
430  excpt = std::current_exception();
431  }
432  }
433  iHolder.doneWaiting(excpt);
434  });
435 
436  auto task = make_functor_task(
437  [this, h = WaitingTaskHolder(*iHolder.group(), doneTask), info = transitionInfo, weakToken]() mutable {
438  auto token = weakToken.lock();
440  // Caught exception is propagated via WaitingTaskHolder
441  CMS_SA_ALLOW try {
442  T::preScheduleSignal(actReg_.get(), &streamContext_);
443 
445  } catch (...) {
446  h.doneWaiting(std::current_exception());
447  return;
448  }
449 
450  for (auto& p : end_paths_) {
451  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
452  }
453 
454  for (auto& p : trig_paths_) {
455  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
456  }
457 
459  });
460 
461  if (streamID_.value() == 0) {
462  //Enqueueing will start another thread if there is only
463  // one thread in the job. Having stream == 0 use spawn
464  // avoids starting up another thread when there is only one stream.
465  iHolder.group()->run([task]() {
466  TaskSentry s{task};
467  task->execute();
468  });
469  } else {
470  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
471  arena.enqueue([task]() {
472  TaskSentry s{task};
473  task->execute();
474  });
475  }
476  }
477 } // namespace edm
478 
479 #endif
std::string_view transitionName(GlobalContext::Transition)
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
ExceptionToActionTable const & actionTable() const
returns the action table
std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
std::vector< Worker * > Workers
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::vector< int > empty_trig_paths_
roAction_t actions[nactions]
Definition: GenABIO.cc:181
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
std::vector< Path > NonTrigPaths
T w() const
std::shared_ptr< HLTGlobalStatus > TrigResPtr
int totalEventsPassed() const
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
void processOneEventAsync(WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
std::vector< BranchToCount > earlyDeleteBranchToCount_
unsigned int numberOfUnscheduledModules() const
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
void addToAllWorkers(Worker *w)
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, StreamID streamID, ProcessContext const *processContext)
AllWorkers const & unscheduledWorkers() const
Definition: WorkerManager.h:86
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
edm::propagate_const< WorkerPtr > results_inserter_
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr< Worker > WorkerPtr
StreamID streamID() const
unsigned int number_of_unscheduled_modules_
int totalEventsFailed() const
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
void processOneOccurrenceAsync(WaitingTaskHolder, typename T::TransitionInfoType &, ServiceToken const &, StreamID, typename T::Context const *topContext, U const *context)
oneapi::tbb::task_group * group() const noexcept
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
AllWorkers const & unscheduledWorkers() const
void doneWaiting(std::exception_ptr iExcept)
PreStreamEarlyTermination preStreamEarlyTerminationSignal_
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
SendTerminationSignalIfException(edm::ActivityRegistry *iReg, edm::StreamContext const *iContext)
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
TrigResConstPtr results() const
StreamContext streamContext_
std::vector< std::string > vstring
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:85
void finishedPaths(std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::vector< Worker * > AllWorkers
rep
Definition: cuy.py:1189
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
void reportSkipped(EventPrincipal const &ep) const
ServiceToken lock() const
Definition: ServiceToken.h:101
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
int totalEvents() const
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:90
StreamContext const & context() const
void clearCounters()
Clear all the counters in the trigger report.
void initializeEarlyDelete(ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, edm::ProductRegistry const &preg)
void getTriggerReport(TriggerReport &rep) const
TrigResPtr & results()
void processOneStreamAsync(WaitingTaskHolder iTask, typename T::TransitionInfoType &transitionInfo, ServiceToken const &token, bool cleaningUpAfterException=false)
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
edm::propagate_const< TrigResPtr > results_
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
std::atomic< bool > skippingEvent_
HLT enums.
double a
Definition: hdecay.h:119
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper)
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
unsigned int value() const
Definition: StreamID.h:43
auto wrap(F iFunc) -> decltype(iFunc())
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
std::list< std::string > const & context() const
Definition: Exception.cc:147
long double T
std::vector< Path > TrigPaths
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_