CMS 3D CMS Logo

StreamSchedule.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_StreamSchedule_h
2 #define FWCore_Framework_StreamSchedule_h
3 
4 /*
5  Author: Jim Kowalkowski 28-01-06
6 
7  A class for creating a schedule based on paths in the configuration file.
8  The schedule is maintained as a sequence of paths.
9  After construction, events can be fed to the object and passed through
10  all the modules in the schedule. All accounting about processing
11  of events by modules and paths is contained here or in object held
12  by containment.
13 
14  The trigger results producer and product are generated and managed here.
15  This class also manages endpaths and calls to endjob and beginjob.
16  Endpaths are just treated as a simple list of modules that need to
17  do processing of the event and do not participate in trigger path
18  activities.
19 
20  This class requires the high-level process pset. It uses @process_name.
21  If the high-level pset contains an "options" pset, then the
22  following optional parameter can be present:
23  bool wantSummary = true/false # default false
24 
25  wantSummary indicates whether or not the pass/fail/error stats
26  for modules and paths should be printed at the end-of-job.
27 
28  A TriggerResults object will always be inserted into the event
29  for any schedule. The producer of the TriggerResults EDProduct
30  is always the first module in the endpath. The TriggerResultInserter
31  is given a fixed label of "TriggerResults".
32 
33  Processing of an event happens by pushing the event through the Paths.
34  The scheduler performs the reset() on each of the workers independent
35  of the Path objects.
36 
37  ------------------------
38 
39  About Paths:
40  Paths fit into two categories:
41  1) trigger paths that contribute directly to saved trigger bits
42  2) end paths
43  The StreamSchedule holds these paths in two data structures:
44  1) main path list
45  2) end path list
46 
47  Trigger path processing always precedes endpath processing.
48  The order of the paths from the input configuration is
49  preserved in the main paths list.
50 
51  ------------------------
52 
53  The StreamSchedule uses the TriggerNamesService to get the names of the
54  trigger paths and end paths. When a TriggerResults object is created
55  the results are stored in the same order as the trigger names from
56  TriggerNamesService.
57 
58 */
59 
90 
91 #include <map>
92 #include <memory>
93 #include <set>
94 #include <string>
95 #include <vector>
96 #include <sstream>
97 #include <atomic>
98 #include <unordered_set>
99 #include <utility>
100 
101 namespace edm {
102 
103  class BranchIDListHelper;
104  class ExceptionCollector;
105  class ExceptionToActionTable;
106  class OutputModuleCommunicator;
108  class WorkerInPath;
109  class ModuleRegistry;
110  class TriggerResultInserter;
111  class PathStatusInserter;
112  class EndPathStatusInserter;
113  class PreallocationConfiguration;
114  class WaitingTaskHolder;
115 
116  class ConditionalTaskHelper;
117 
118  namespace service {
119  class TriggerNamesService;
120  }
121 
123  public:
124  typedef std::vector<std::string> vstring;
125  typedef std::vector<Path> TrigPaths;
126  typedef std::vector<Path> NonTrigPaths;
127  typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
128  typedef std::shared_ptr<HLTGlobalStatus const> TrigResConstPtr;
129  typedef std::shared_ptr<Worker> WorkerPtr;
130  typedef std::vector<Worker*> AllWorkers;
131 
132  typedef std::vector<Worker*> Workers;
133 
134  typedef std::vector<WorkerInPath> PathWorkers;
135 
136  StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
137  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
138  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
139  std::shared_ptr<ModuleRegistry>,
140  ParameterSet& proc_pset,
141  service::TriggerNamesService const& tns,
142  PreallocationConfiguration const& prealloc,
143  ProductRegistry& pregistry,
145  std::shared_ptr<ActivityRegistry> areg,
146  std::shared_ptr<ProcessConfiguration const> processConfiguration,
148  ProcessContext const* processContext);
149 
150  StreamSchedule(StreamSchedule const&) = delete;
151 
153  WaitingTaskHolder iTask,
155  ServiceToken const& token,
156  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters);
157 
158  template <typename T>
160  typename T::TransitionInfoType& transitionInfo,
161  ServiceToken const& token,
162  bool cleaningUpAfterException = false);
163 
164  void beginStream();
165  void endStream();
166 
167  StreamID streamID() const { return streamID_; }
168 
171 
175  std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
176 
178  void availablePaths(std::vector<std::string>& oLabelsToFill) const;
179 
181  void modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const;
182 
183  void moduleDescriptionsInPath(std::string const& iPathLabel,
184  std::vector<ModuleDescription const*>& descriptions,
185  unsigned int hint) const;
186 
187  void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
188  std::vector<ModuleDescription const*>& descriptions,
189  unsigned int hint) const;
190 
194  int totalEvents() const { return total_events_; }
195 
198  int totalEventsPassed() const { return total_passed_; }
199 
202  int totalEventsFailed() const { return totalEvents() - totalEventsPassed(); }
203 
206  void getTriggerReport(TriggerReport& rep) const;
207 
209  void clearCounters();
210 
212  void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
213 
215  void deleteModule(std::string const& iLabel);
216 
218  std::vector<std::string> const& branchesToDeleteEarly,
219  std::multimap<std::string, std::string> const& referencesToBranches,
220  std::vector<std::string> const& modulesToSkip,
221  edm::ProductRegistry const& preg);
222 
227 
230  }
232 
233  StreamContext const& context() const { return streamContext_; }
234 
235  struct AliasInfo {
240  };
241 
242  private:
245 
246  void resetAll();
247 
248  void finishedPaths(std::atomic<std::exception_ptr*>&, WaitingTaskHolder, EventTransitionInfo&);
249 
250  std::exception_ptr finishProcessOneEvent(std::exception_ptr);
251 
252  void reportSkipped(EventPrincipal const& ep) const;
253 
254  std::vector<Worker*> tryToPlaceConditionalModules(
255  Worker*,
256  std::unordered_set<std::string>& conditionalModules,
257  std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
258  std::unordered_multimap<std::string, AliasInfo> const& aliasMap,
259  ParameterSet& proc_pset,
260  ProductRegistry& preg,
261  PreallocationConfiguration const* prealloc,
262  std::shared_ptr<ProcessConfiguration const> processConfiguration);
263  void fillWorkers(ParameterSet& proc_pset,
264  ProductRegistry& preg,
265  PreallocationConfiguration const* prealloc,
266  std::shared_ptr<ProcessConfiguration const> processConfiguration,
267  std::string const& name,
268  bool ignoreFilters,
269  PathWorkers& out,
270  std::vector<std::string> const& endPathNames,
271  ConditionalTaskHelper const& conditionalTaskHelper,
272  std::unordered_set<std::string>& allConditionalModules);
273  void fillTrigPath(ParameterSet& proc_pset,
274  ProductRegistry& preg,
275  PreallocationConfiguration const* prealloc,
276  std::shared_ptr<ProcessConfiguration const> processConfiguration,
277  int bitpos,
278  std::string const& name,
279  TrigResPtr,
280  std::vector<std::string> const& endPathNames,
281  ConditionalTaskHelper const& conditionalTaskHelper,
282  std::unordered_set<std::string>& allConditionalModules);
283  void fillEndPath(ParameterSet& proc_pset,
284  ProductRegistry& preg,
285  PreallocationConfiguration const* prealloc,
286  std::shared_ptr<ProcessConfiguration const> processConfiguration,
287  int bitpos,
288  std::string const& name,
289  std::vector<std::string> const& endPathNames,
290  ConditionalTaskHelper const& conditionalTaskHelper,
291  std::unordered_set<std::string>& allConditionalModules);
292 
293  void addToAllWorkers(Worker* w);
294 
295  void resetEarlyDelete();
296 
299 
301  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
302  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
304 
305  template <typename T>
306  void preScheduleSignal(StreamContext const*) const;
307 
308  template <typename T>
309  void postScheduleSignal(StreamContext const*, ServiceWeakToken const&, std::exception_ptr&) const noexcept;
310 
311  void handleException(StreamContext const&,
312  ServiceWeakToken const&,
313  bool cleaningUpAfterException,
314  std::exception_ptr&) const noexcept;
315 
319  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
320 
322 
324  std::vector<edm::propagate_const<WorkerPtr>> pathStatusInserterWorkers_;
325  std::vector<edm::propagate_const<WorkerPtr>> endPathStatusInserterWorkers_;
326 
329  std::vector<int> empty_trig_paths_;
330  std::vector<int> empty_end_paths_;
331 
332  //For each branch that has been marked for early deletion
333  // keep track of how many modules are left that read this data but have
334  // not yet been run in this event
335  std::vector<BranchToCount> earlyDeleteBranchToCount_;
336  //NOTE the following is effectively internal data for each EarlyDeleteHelper
337  // but putting it into one vector makes for better allocation as well as
338  // faster iteration when used to reset the earlyDeleteBranchToCount_
339  // Each EarlyDeleteHelper hold a begin and end range into this vector. The values
340  // of this vector correspond to indexes into earlyDeleteBranchToCount_ so
341  // tell which EarlyDeleteHelper is associated with which BranchIDs.
342  std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
343  //There is one EarlyDeleteHelper per Module which are reading data that
344  // has been marked for early deletion
345  std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
346 
350 
353  };
354 
355  void inline StreamSchedule::reportSkipped(EventPrincipal const& ep) const {
356  Service<JobReport> reportSvc;
357  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
358  }
359 
360  template <typename T>
362  typename T::TransitionInfoType& transitionInfo,
363  ServiceToken const& token,
364  bool cleaningUpAfterException) {
365  auto group = iHolder.group();
366  auto const& principal = transitionInfo.principal();
367  T::setStreamContext(streamContext_, principal);
368 
369  ServiceWeakToken weakToken = token;
370  auto doneTask = make_waiting_task([this, iHolder = std::move(iHolder), cleaningUpAfterException, weakToken](
371  std::exception_ptr const* iPtr) mutable {
372  std::exception_ptr excpt;
373  if (iPtr) {
374  excpt = *iPtr;
375  handleException(streamContext_, weakToken, cleaningUpAfterException, excpt);
376  }
377  postScheduleSignal<T>(&streamContext_, weakToken, excpt);
378  iHolder.doneWaiting(excpt);
379  });
380 
381  auto task =
382  make_functor_task([this, h = WaitingTaskHolder(*group, doneTask), info = transitionInfo, weakToken]() mutable {
383  auto token = weakToken.lock();
385  // Caught exception is propagated via WaitingTaskHolder
386  WorkerManager* workerManager = &workerManagerRuns_;
387  if (T::branchType_ == InLumi) {
388  workerManager = &workerManagerLumisAndEvents_;
389  }
390  CMS_SA_ALLOW try {
391  preScheduleSignal<T>(&streamContext_);
392  workerManager->resetAll();
393  } catch (...) {
394  h.doneWaiting(std::current_exception());
395  return;
396  }
397 
399  });
400 
401  if (streamID_.value() == 0) {
402  //Enqueueing will start another thread if there is only
403  // one thread in the job. Having stream == 0 use spawn
404  // avoids starting up another thread when there is only one stream.
405  group->run([task]() {
406  TaskSentry s{task};
407  task->execute();
408  });
409  } else {
410  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
411  arena.enqueue([task]() {
412  TaskSentry s{task};
413  task->execute();
414  });
415  }
416  }
417 
418  template <typename T>
419  void StreamSchedule::preScheduleSignal(StreamContext const* streamContext) const {
420  try {
421  convertException::wrap([this, streamContext]() { T::preScheduleSignal(actReg_.get(), streamContext); });
422  } catch (cms::Exception& ex) {
423  std::ostringstream ost;
424  ex.addContext("Handling pre signal, likely in a service function");
425  exceptionContext(ost, *streamContext);
426  ex.addContext(ost.str());
427  throw;
428  }
429  }
430 
431  template <typename T>
433  ServiceWeakToken const& weakToken,
434  std::exception_ptr& excpt) const noexcept {
435  try {
436  convertException::wrap([this, &weakToken, streamContext]() {
437  ServiceRegistry::Operate op(weakToken.lock());
438  T::postScheduleSignal(actReg_.get(), streamContext);
439  });
440  } catch (cms::Exception& ex) {
441  if (not excpt) {
442  std::ostringstream ost;
443  ex.addContext("Handling post signal, likely in a service function");
444  exceptionContext(ost, *streamContext);
445  ex.addContext(ost.str());
446  excpt = std::current_exception();
447  }
448  }
449  }
450 } // namespace edm
451 
452 #endif
void initializeEarlyDelete(ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, std::multimap< std::string, std::string > const &referencesToBranches, std::vector< std::string > const &modulesToSkip, edm::ProductRegistry const &preg)
AllWorkers const & allWorkersBeginEnd() const
returns the collection of pointers to workers
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
ExceptionToActionTable const & actionTable() const
returns the action table
std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
std::vector< Worker * > Workers
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::vector< int > empty_trig_paths_
roAction_t actions[nactions]
Definition: GenABIO.cc:181
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
void postScheduleSignal(StreamContext const *, ServiceWeakToken const &, std::exception_ptr &) const noexcept
std::vector< Path > NonTrigPaths
T w() const
std::shared_ptr< HLTGlobalStatus > TrigResPtr
int totalEventsPassed() const
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
void preScheduleSignal(StreamContext const *) const
void processOneEventAsync(WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
std::vector< BranchToCount > earlyDeleteBranchToCount_
unsigned int numberOfUnscheduledModules() const
void addToAllWorkers(Worker *w)
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, StreamID streamID, ProcessContext const *processContext)
AllWorkers const & unscheduledWorkers() const
Definition: WorkerManager.h:81
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
edm::propagate_const< WorkerPtr > results_inserter_
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr< Worker > WorkerPtr
StreamID streamID() const
WorkerManager workerManagerBeginEnd_
unsigned int number_of_unscheduled_modules_
int totalEventsFailed() const
std::shared_ptr< ActivityRegistry > actReg_
oneapi::tbb::task_group * group() const noexcept
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
WorkerManager workerManagerRuns_
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
TrigResConstPtr results() const
void processOneOccurrenceAsync(WaitingTaskHolder, typename T::TransitionInfoType &, ServiceToken const &, StreamID, typename T::Context const *topContext, U const *context) noexcept
StreamContext streamContext_
std::vector< std::string > vstring
AllWorkers const & unscheduledWorkersLumisAndEvents() const
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:80
void finishedPaths(std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
void handleException(StreamContext const &, ServiceWeakToken const &, bool cleaningUpAfterException, std::exception_ptr &) const noexcept
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
std::vector< Worker * > AllWorkers
rep
Definition: cuy.py:1189
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
void reportSkipped(EventPrincipal const &ep) const
ServiceToken lock() const
Definition: ServiceToken.h:101
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
void doneWaiting(std::exception_ptr iExcept) noexcept
int totalEvents() const
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:85
StreamContext const & context() const
void clearCounters()
Clear all the counters in the trigger report.
void getTriggerReport(TriggerReport &rep) const
TrigResPtr & results()
void processOneStreamAsync(WaitingTaskHolder iTask, typename T::TransitionInfoType &transitionInfo, ServiceToken const &token, bool cleaningUpAfterException=false)
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
edm::propagate_const< TrigResPtr > results_
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
AllWorkers const & allWorkersLumisAndEvents() const
void addContext(std::string const &context)
Definition: Exception.cc:169
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
HLT enums.
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
unsigned int value() const
Definition: StreamID.h:43
auto wrap(F iFunc) -> decltype(iFunc())
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
long double T
WorkerManager workerManagerLumisAndEvents_
std::vector< Path > TrigPaths
AllWorkers const & allWorkersRuns() const
def move(src, dest)
Definition: eostools.py:511
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_