CMS 3D CMS Logo

StreamSchedule.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_StreamSchedule_h
2 #define FWCore_Framework_StreamSchedule_h
3 
4 /*
5  Author: Jim Kowalkowski 28-01-06
6 
7  A class for creating a schedule based on paths in the configuration file.
8  The schedule is maintained as a sequence of paths.
9  After construction, events can be fed to the object and passed through
10  all the modules in the schedule. All accounting about processing
11  of events by modules and paths is contained here or in object held
12  by containment.
13 
14  The trigger results producer and product are generated and managed here.
15  This class also manages endpaths and calls to endjob and beginjob.
16  Endpaths are just treated as a simple list of modules that need to
17  do processing of the event and do not participate in trigger path
18  activities.
19 
20  This class requires the high-level process pset. It uses @process_name.
21  If the high-level pset contains an "options" pset, then the
22  following optional parameter can be present:
23  bool wantSummary = true/false # default false
24 
25  wantSummary indicates whether or not the pass/fail/error stats
26  for modules and paths should be printed at the end-of-job.
27 
28  A TriggerResults object will always be inserted into the event
29  for any schedule. The producer of the TriggerResults EDProduct
30  is always the first module in the endpath. The TriggerResultInserter
31  is given a fixed label of "TriggerResults".
32 
33  Processing of an event happens by pushing the event through the Paths.
34  The scheduler performs the reset() on each of the workers independent
35  of the Path objects.
36 
37  ------------------------
38 
39  About Paths:
40  Paths fit into two categories:
41  1) trigger paths that contribute directly to saved trigger bits
42  2) end paths
43  The StreamSchedule holds these paths in two data structures:
44  1) main path list
45  2) end path list
46 
47  Trigger path processing always precedes endpath processing.
48  The order of the paths from the input configuration is
49  preserved in the main paths list.
50 
51  ------------------------
52 
53  The StreamSchedule uses the TriggerNamesService to get the names of the
54  trigger paths and end paths. When a TriggerResults object is created
55  the results are stored in the same order as the trigger names from
56  TriggerNamesService.
57 
58 */
59 
89 
90 #include <map>
91 #include <memory>
92 #include <set>
93 #include <string>
94 #include <vector>
95 #include <sstream>
96 #include <atomic>
97 #include <unordered_set>
98 
99 namespace edm {
100 
101  class ActivityRegistry;
102  class BranchIDListHelper;
103  class ExceptionCollector;
104  class ExceptionToActionTable;
105  class OutputModuleCommunicator;
106  class ProcessContext;
108  class WorkerInPath;
109  class ModuleRegistry;
110  class TriggerResultInserter;
111  class PathStatusInserter;
112  class EndPathStatusInserter;
113  class PreallocationConfiguration;
114  class WaitingTaskHolder;
115 
116  class ConditionalTaskHelper;
117 
118  namespace service {
119  class TriggerNamesService;
120  }
121 
122  namespace {
123  template <typename T>
124  class StreamScheduleSignalSentry {
125  public:
126  StreamScheduleSignalSentry(ActivityRegistry* a, typename T::Context const* context)
127  : a_(a), context_(context), allowThrow_(false) {
128  if (a_)
129  T::preScheduleSignal(a_, context_);
130  }
131  ~StreamScheduleSignalSentry() noexcept(false) {
132  // Caught exception is rethrown (when allowed)
133  CMS_SA_ALLOW try {
134  if (a_) {
135  T::postScheduleSignal(a_, context_);
136  }
137  } catch (...) {
138  if (allowThrow_) {
139  throw;
140  }
141  }
142  }
143 
144  void allowThrow() { allowThrow_ = true; }
145 
146  private:
147  // We own none of these resources.
148  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
149  typename T::Context const* context_;
150  bool allowThrow_;
151  };
152  } // namespace
153 
155  public:
156  typedef std::vector<std::string> vstring;
157  typedef std::vector<Path> TrigPaths;
158  typedef std::vector<Path> NonTrigPaths;
159  typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
160  typedef std::shared_ptr<HLTGlobalStatus const> TrigResConstPtr;
161  typedef std::shared_ptr<Worker> WorkerPtr;
162  typedef std::vector<Worker*> AllWorkers;
163 
164  typedef std::vector<Worker*> Workers;
165 
166  typedef std::vector<WorkerInPath> PathWorkers;
167 
168  StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
169  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
170  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
171  std::shared_ptr<ModuleRegistry>,
172  ParameterSet& proc_pset,
173  service::TriggerNamesService const& tns,
174  PreallocationConfiguration const& prealloc,
175  ProductRegistry& pregistry,
177  std::shared_ptr<ActivityRegistry> areg,
178  std::shared_ptr<ProcessConfiguration const> processConfiguration,
180  ProcessContext const* processContext);
181 
182  StreamSchedule(StreamSchedule const&) = delete;
183 
185  WaitingTaskHolder iTask,
187  ServiceToken const& token,
188  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters);
189 
190  template <typename T>
192  typename T::TransitionInfoType& transitionInfo,
193  ServiceToken const& token,
194  bool cleaningUpAfterException = false);
195 
196  void beginStream();
197  void endStream();
198 
199  StreamID streamID() const { return streamID_; }
200 
203 
207  std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
208 
210  void availablePaths(std::vector<std::string>& oLabelsToFill) const;
211 
213  void modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const;
214 
215  void moduleDescriptionsInPath(std::string const& iPathLabel,
216  std::vector<ModuleDescription const*>& descriptions,
217  unsigned int hint) const;
218 
219  void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
220  std::vector<ModuleDescription const*>& descriptions,
221  unsigned int hint) const;
222 
226  int totalEvents() const { return total_events_; }
227 
230  int totalEventsPassed() const { return total_passed_; }
231 
234  int totalEventsFailed() const { return totalEvents() - totalEventsPassed(); }
235 
238  void getTriggerReport(TriggerReport& rep) const;
239 
241  void clearCounters();
242 
244  void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
245 
247  void deleteModule(std::string const& iLabel);
248 
250  std::vector<std::string> const& branchesToDeleteEarly,
251  std::multimap<std::string, std::string> const& referencesToBranches,
252  std::vector<std::string> const& modulesToSkip,
253  edm::ProductRegistry const& preg);
254 
256  AllWorkers const& allWorkers() const { return workerManager_.allWorkers(); }
257 
260 
261  StreamContext const& context() const { return streamContext_; }
262 
263  struct AliasInfo {
268  };
269 
270  private:
271  //Sentry class to only send a signal if an
272  // exception occurs. An exception is identified
273  // by the destructor being called without first
274  // calling completedSuccessfully().
276  public:
278  : reg_(iReg), context_(iContext) {}
280  if (reg_) {
282  }
283  }
284  void completedSuccessfully() { reg_ = nullptr; }
285 
286  private:
287  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
289  };
290 
293 
294  void resetAll();
295 
296  void finishedPaths(std::atomic<std::exception_ptr*>&, WaitingTaskHolder, EventTransitionInfo&);
297 
298  std::exception_ptr finishProcessOneEvent(std::exception_ptr);
299 
300  void reportSkipped(EventPrincipal const& ep) const;
301 
302  std::vector<Worker*> tryToPlaceConditionalModules(
303  Worker*,
304  std::unordered_set<std::string>& conditionalModules,
305  std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
306  std::unordered_multimap<std::string, AliasInfo> const& aliasMap,
307  ParameterSet& proc_pset,
308  ProductRegistry& preg,
309  PreallocationConfiguration const* prealloc,
310  std::shared_ptr<ProcessConfiguration const> processConfiguration);
311  void fillWorkers(ParameterSet& proc_pset,
312  ProductRegistry& preg,
313  PreallocationConfiguration const* prealloc,
314  std::shared_ptr<ProcessConfiguration const> processConfiguration,
315  std::string const& name,
316  bool ignoreFilters,
317  PathWorkers& out,
318  std::vector<std::string> const& endPathNames,
319  ConditionalTaskHelper const& conditionalTaskHelper,
320  std::unordered_set<std::string>& allConditionalModules);
321  void fillTrigPath(ParameterSet& proc_pset,
322  ProductRegistry& preg,
323  PreallocationConfiguration const* prealloc,
324  std::shared_ptr<ProcessConfiguration const> processConfiguration,
325  int bitpos,
326  std::string const& name,
327  TrigResPtr,
328  std::vector<std::string> const& endPathNames,
329  ConditionalTaskHelper const& conditionalTaskHelper,
330  std::unordered_set<std::string>& allConditionalModules);
331  void fillEndPath(ParameterSet& proc_pset,
332  ProductRegistry& preg,
333  PreallocationConfiguration const* prealloc,
334  std::shared_ptr<ProcessConfiguration const> processConfiguration,
335  int bitpos,
336  std::string const& name,
337  std::vector<std::string> const& endPathNames,
338  ConditionalTaskHelper const& conditionalTaskHelper,
339  std::unordered_set<std::string>& allConditionalModules);
340 
341  void addToAllWorkers(Worker* w);
342 
343  void resetEarlyDelete();
344 
347 
349  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
350  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
352 
354  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
355 
357 
359  std::vector<edm::propagate_const<WorkerPtr>> pathStatusInserterWorkers_;
360  std::vector<edm::propagate_const<WorkerPtr>> endPathStatusInserterWorkers_;
361 
364  std::vector<int> empty_trig_paths_;
365  std::vector<int> empty_end_paths_;
366 
367  //For each branch that has been marked for early deletion
368  // keep track of how many modules are left that read this data but have
369  // not yet been run in this event
370  std::vector<BranchToCount> earlyDeleteBranchToCount_;
371  //NOTE the following is effectively internal data for each EarlyDeleteHelper
372  // but putting it into one vector makes for better allocation as well as
373  // faster iteration when used to reset the earlyDeleteBranchToCount_
374  // Each EarlyDeleteHelper hold a begin and end range into this vector. The values
375  // of this vector correspond to indexes into earlyDeleteBranchToCount_ so
376  // tell which EarlyDeleteHelper is associated with which BranchIDs.
377  std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
378  //There is one EarlyDeleteHelper per Module which are reading data that
379  // has been marked for early deletion
380  std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
381 
385 
388  };
389 
390  void inline StreamSchedule::reportSkipped(EventPrincipal const& ep) const {
391  Service<JobReport> reportSvc;
392  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
393  }
394 
395  template <typename T>
397  typename T::TransitionInfoType& transitionInfo,
398  ServiceToken const& token,
399  bool cleaningUpAfterException) {
400  auto const& principal = transitionInfo.principal();
401  T::setStreamContext(streamContext_, principal);
402 
403  auto id = principal.id();
404  ServiceWeakToken weakToken = token;
405  auto doneTask = make_waiting_task(
406  [this, iHolder, id, cleaningUpAfterException, weakToken](std::exception_ptr const* iPtr) mutable {
407  std::exception_ptr excpt;
408  if (iPtr) {
409  excpt = *iPtr;
410  //add context information to the exception and print message
411  try {
412  convertException::wrap([&]() { std::rethrow_exception(excpt); });
413  } catch (cms::Exception& ex) {
414  //TODO: should add the transition type info
415  std::ostringstream ost;
416  if (ex.context().empty()) {
417  ost << "Processing " << T::transitionName() << " " << id;
418  }
419  ServiceRegistry::Operate op(weakToken.lock());
420  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
421  excpt = std::current_exception();
422  }
423 
424  ServiceRegistry::Operate op(weakToken.lock());
425  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
426  }
427  // Caught exception is propagated via WaitingTaskHolder
428  CMS_SA_ALLOW try {
429  ServiceRegistry::Operate op(weakToken.lock());
430  T::postScheduleSignal(actReg_.get(), &streamContext_);
431  } catch (...) {
432  if (not excpt) {
433  excpt = std::current_exception();
434  }
435  }
436  iHolder.doneWaiting(excpt);
437  });
438 
439  auto task = make_functor_task(
440  [this, h = WaitingTaskHolder(*iHolder.group(), doneTask), info = transitionInfo, weakToken]() mutable {
441  auto token = weakToken.lock();
443  // Caught exception is propagated via WaitingTaskHolder
444  CMS_SA_ALLOW try {
445  T::preScheduleSignal(actReg_.get(), &streamContext_);
446 
448  } catch (...) {
449  h.doneWaiting(std::current_exception());
450  return;
451  }
452 
453  for (auto& p : end_paths_) {
454  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
455  }
456 
457  for (auto& p : trig_paths_) {
458  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
459  }
460 
462  });
463 
464  if (streamID_.value() == 0) {
465  //Enqueueing will start another thread if there is only
466  // one thread in the job. Having stream == 0 use spawn
467  // avoids starting up another thread when there is only one stream.
468  iHolder.group()->run([task]() {
469  TaskSentry s{task};
470  task->execute();
471  });
472  } else {
473  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
474  arena.enqueue([task]() {
475  TaskSentry s{task};
476  task->execute();
477  });
478  }
479  }
480 } // namespace edm
481 
482 #endif
std::string_view transitionName(GlobalContext::Transition)
void initializeEarlyDelete(ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, std::multimap< std::string, std::string > const &referencesToBranches, std::vector< std::string > const &modulesToSkip, edm::ProductRegistry const &preg)
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
ExceptionToActionTable const & actionTable() const
returns the action table
std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
std::vector< Worker * > Workers
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::vector< int > empty_trig_paths_
roAction_t actions[nactions]
Definition: GenABIO.cc:181
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
std::vector< Path > NonTrigPaths
T w() const
std::shared_ptr< HLTGlobalStatus > TrigResPtr
int totalEventsPassed() const
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
void processOneEventAsync(WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
std::vector< BranchToCount > earlyDeleteBranchToCount_
unsigned int numberOfUnscheduledModules() const
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
void addToAllWorkers(Worker *w)
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, StreamID streamID, ProcessContext const *processContext)
AllWorkers const & unscheduledWorkers() const
Definition: WorkerManager.h:89
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
edm::propagate_const< WorkerPtr > results_inserter_
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr< Worker > WorkerPtr
StreamID streamID() const
unsigned int number_of_unscheduled_modules_
int totalEventsFailed() const
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
void processOneOccurrenceAsync(WaitingTaskHolder, typename T::TransitionInfoType &, ServiceToken const &, StreamID, typename T::Context const *topContext, U const *context)
oneapi::tbb::task_group * group() const noexcept
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
AllWorkers const & unscheduledWorkers() const
void doneWaiting(std::exception_ptr iExcept)
PreStreamEarlyTermination preStreamEarlyTerminationSignal_
SendTerminationSignalIfException(edm::ActivityRegistry *iReg, edm::StreamContext const *iContext)
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
TrigResConstPtr results() const
StreamContext streamContext_
std::vector< std::string > vstring
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:88
void finishedPaths(std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
std::vector< Worker * > AllWorkers
rep
Definition: cuy.py:1189
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
void reportSkipped(EventPrincipal const &ep) const
ServiceToken lock() const
Definition: ServiceToken.h:101
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
int totalEvents() const
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:93
StreamContext const & context() const
void clearCounters()
Clear all the counters in the trigger report.
void getTriggerReport(TriggerReport &rep) const
TrigResPtr & results()
void processOneStreamAsync(WaitingTaskHolder iTask, typename T::TransitionInfoType &transitionInfo, ServiceToken const &token, bool cleaningUpAfterException=false)
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
edm::propagate_const< TrigResPtr > results_
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
HLT enums.
double a
Definition: hdecay.h:121
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
unsigned int value() const
Definition: StreamID.h:43
auto wrap(F iFunc) -> decltype(iFunc())
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
std::list< std::string > const & context() const
Definition: Exception.cc:151
long double T
std::vector< Path > TrigPaths
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_