CMS 3D CMS Logo

StreamSchedule.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_StreamSchedule_h
2 #define FWCore_Framework_StreamSchedule_h
3 
4 /*
5  Author: Jim Kowalkowski 28-01-06
6 
7  A class for creating a schedule based on paths in the configuration file.
8  The schedule is maintained as a sequence of paths.
9  After construction, events can be fed to the object and passed through
10  all the modules in the schedule. All accounting about processing
11  of events by modules and paths is contained here or in object held
12  by containment.
13 
14  The trigger results producer and product are generated and managed here.
15  This class also manages endpaths and calls to endjob and beginjob.
16  Endpaths are just treated as a simple list of modules that need to
17  do processing of the event and do not participate in trigger path
18  activities.
19 
20  This class requires the high-level process pset. It uses @process_name.
21  If the high-level pset contains an "options" pset, then the
22  following optional parameter can be present:
23  bool wantSummary = true/false # default false
24 
25  wantSummary indicates whether or not the pass/fail/error stats
26  for modules and paths should be printed at the end-of-job.
27 
28  A TriggerResults object will always be inserted into the event
29  for any schedule. The producer of the TriggerResults EDProduct
30  is always the first module in the endpath. The TriggerResultInserter
31  is given a fixed label of "TriggerResults".
32 
33  Processing of an event happens by pushing the event through the Paths.
34  The scheduler performs the reset() on each of the workers independent
35  of the Path objects.
36 
37  ------------------------
38 
39  About Paths:
40  Paths fit into two categories:
41  1) trigger paths that contribute directly to saved trigger bits
42  2) end paths
43  The StreamSchedule holds these paths in two data structures:
44  1) main path list
45  2) end path list
46 
47  Trigger path processing always precedes endpath processing.
48  The order of the paths from the input configuration is
49  preserved in the main paths list.
50 
51  ------------------------
52 
53  The StreamSchedule uses the TriggerNamesService to get the names of the
54  trigger paths and end paths. When a TriggerResults object is created
55  the results are stored in the same order as the trigger names from
56  TriggerNamesService.
57 
58 */
59 
92 
93 #include <exception>
94 #include <map>
95 #include <memory>
96 #include <mutex>
97 #include <set>
98 #include <string>
99 #include <vector>
100 #include <sstream>
101 #include <atomic>
102 #include <unordered_set>
103 #include <utility>
104 
105 namespace edm {
106 
107  class BranchIDListHelper;
108  class ExceptionCollector;
109  class ExceptionToActionTable;
110  class OutputModuleCommunicator;
112  class WorkerInPath;
113  class ModuleRegistry;
114  class TriggerResultInserter;
115  class PathStatusInserter;
116  class EndPathStatusInserter;
117  class PreallocationConfiguration;
118  class ConditionalTaskHelper;
119 
120  namespace service {
121  class TriggerNamesService;
122  }
123 
125  public:
126  typedef std::vector<std::string> vstring;
127  typedef std::vector<Path> TrigPaths;
128  typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
129  typedef std::shared_ptr<HLTGlobalStatus const> TrigResConstPtr;
130  typedef std::shared_ptr<Worker> WorkerPtr;
131  typedef std::vector<Worker*> AllWorkers;
132 
133  typedef std::vector<Worker*> Workers;
134 
135  typedef std::vector<WorkerInPath> PathWorkers;
136 
137  StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
138  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
139  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
140  std::shared_ptr<ModuleRegistry>,
141  ParameterSet& proc_pset,
142  service::TriggerNamesService const& tns,
143  PreallocationConfiguration const& prealloc,
144  ProductRegistry& pregistry,
146  std::shared_ptr<ActivityRegistry> areg,
147  std::shared_ptr<ProcessConfiguration const> processConfiguration,
149  ProcessContext const* processContext);
150 
151  StreamSchedule(StreamSchedule const&) = delete;
152 
154  WaitingTaskHolder iTask,
156  ServiceToken const& token,
157  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters);
158 
159  template <typename T>
161  typename T::TransitionInfoType& transitionInfo,
162  ServiceToken const& token,
163  bool cleaningUpAfterException = false);
164 
165  void beginStream();
166  void endStream(ExceptionCollector& collector, std::mutex& collectorMutex) noexcept;
167 
168  StreamID streamID() const { return streamID_; }
169 
172 
176  std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
177 
179  void availablePaths(std::vector<std::string>& oLabelsToFill) const;
180 
182  void modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const;
183 
184  void moduleDescriptionsInPath(std::string const& iPathLabel,
185  std::vector<ModuleDescription const*>& descriptions,
186  unsigned int hint) const;
187 
188  void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
189  std::vector<ModuleDescription const*>& descriptions,
190  unsigned int hint) const;
191 
195  int totalEvents() const { return total_events_; }
196 
199  int totalEventsPassed() const { return total_passed_; }
200 
203  int totalEventsFailed() const { return totalEvents() - totalEventsPassed(); }
204 
207  void getTriggerReport(TriggerReport& rep) const;
208 
210  void clearCounters();
211 
213  void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
214 
216  void deleteModule(std::string const& iLabel);
217 
219  std::vector<std::string> const& branchesToDeleteEarly,
220  std::multimap<std::string, std::string> const& referencesToBranches,
221  std::vector<std::string> const& modulesToSkip,
222  edm::ProductRegistry const& preg);
223 
228 
231  }
233 
234  StreamContext const& context() const { return streamContext_; }
235 
236  struct AliasInfo {
241  };
242 
243  private:
246 
247  void resetAll();
248 
249  void finishedPaths(std::atomic<std::exception_ptr*>&, WaitingTaskHolder, EventTransitionInfo&);
250 
251  std::exception_ptr finishProcessOneEvent(std::exception_ptr);
252 
253  void reportSkipped(EventPrincipal const& ep) const;
254 
255  std::vector<Worker*> tryToPlaceConditionalModules(
256  Worker*,
257  std::unordered_set<std::string>& conditionalModules,
258  std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
259  std::unordered_multimap<std::string, AliasInfo> const& aliasMap,
260  ParameterSet& proc_pset,
261  ProductRegistry& preg,
262  PreallocationConfiguration const* prealloc,
263  std::shared_ptr<ProcessConfiguration const> processConfiguration);
264  void fillWorkers(ParameterSet& proc_pset,
265  ProductRegistry& preg,
266  PreallocationConfiguration const* prealloc,
267  std::shared_ptr<ProcessConfiguration const> processConfiguration,
268  std::string const& name,
269  bool ignoreFilters,
270  PathWorkers& out,
271  std::vector<std::string> const& endPathNames,
272  ConditionalTaskHelper const& conditionalTaskHelper,
273  std::unordered_set<std::string>& allConditionalModules);
274  void fillTrigPath(ParameterSet& proc_pset,
275  ProductRegistry& preg,
276  PreallocationConfiguration const* prealloc,
277  std::shared_ptr<ProcessConfiguration const> processConfiguration,
278  int bitpos,
279  std::string const& name,
280  TrigResPtr,
281  std::vector<std::string> const& endPathNames,
282  ConditionalTaskHelper const& conditionalTaskHelper,
283  std::unordered_set<std::string>& allConditionalModules);
284  void fillEndPath(ParameterSet& proc_pset,
285  ProductRegistry& preg,
286  PreallocationConfiguration const* prealloc,
287  std::shared_ptr<ProcessConfiguration const> processConfiguration,
288  int bitpos,
289  std::string const& name,
290  std::vector<std::string> const& endPathNames,
291  ConditionalTaskHelper const& conditionalTaskHelper,
292  std::unordered_set<std::string>& allConditionalModules);
293 
294  void addToAllWorkers(Worker* w);
295 
296  void resetEarlyDelete();
297 
300 
302  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
303  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
305 
306  template <typename T>
307  void preScheduleSignal(StreamContext const*) const;
308 
309  template <typename T>
310  void postScheduleSignal(StreamContext const*, std::exception_ptr&) const noexcept;
311 
312  void handleException(StreamContext const&, bool cleaningUpAfterException, std::exception_ptr&) const noexcept;
313 
317  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
318 
320 
322  std::vector<edm::propagate_const<WorkerPtr>> pathStatusInserterWorkers_;
323  std::vector<edm::propagate_const<WorkerPtr>> endPathStatusInserterWorkers_;
324 
327  std::vector<int> empty_trig_paths_;
328  std::vector<int> empty_end_paths_;
329 
330  //For each branch that has been marked for early deletion
331  // keep track of how many modules are left that read this data but have
332  // not yet been run in this event
333  std::vector<BranchToCount> earlyDeleteBranchToCount_;
334  //NOTE the following is effectively internal data for each EarlyDeleteHelper
335  // but putting it into one vector makes for better allocation as well as
336  // faster iteration when used to reset the earlyDeleteBranchToCount_
337  // Each EarlyDeleteHelper hold a begin and end range into this vector. The values
338  // of this vector correspond to indexes into earlyDeleteBranchToCount_ so
339  // tell which EarlyDeleteHelper is associated with which BranchIDs.
340  std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
341  //There is one EarlyDeleteHelper per Module which are reading data that
342  // has been marked for early deletion
343  std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
344 
348 
351  };
352 
353  void inline StreamSchedule::reportSkipped(EventPrincipal const& ep) const {
354  Service<JobReport> reportSvc;
355  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
356  }
357 
358  template <typename T>
360  typename T::TransitionInfoType& transitionInfo,
361  ServiceToken const& token,
362  bool cleaningUpAfterException) {
363  auto group = iHolder.group();
364  auto const& principal = transitionInfo.principal();
365  T::setStreamContext(streamContext_, principal);
366 
367  ServiceWeakToken weakToken = token;
368  auto doneTask = make_waiting_task([this, iHolder = std::move(iHolder), cleaningUpAfterException, weakToken](
369  std::exception_ptr const* iPtr) mutable {
370  std::exception_ptr excpt;
371  {
372  ServiceRegistry::Operate op(weakToken.lock());
373 
374  if (iPtr) {
375  excpt = *iPtr;
376  handleException(streamContext_, cleaningUpAfterException, excpt);
377  }
378  postScheduleSignal<T>(&streamContext_, excpt);
379  } // release service token before calling doneWaiting
380  iHolder.doneWaiting(excpt);
381  });
382 
383  auto task =
384  make_functor_task([this, h = WaitingTaskHolder(*group, doneTask), info = transitionInfo, weakToken]() mutable {
385  auto token = weakToken.lock();
387  // Caught exception is propagated via WaitingTaskHolder
388  WorkerManager* workerManager = &workerManagerRuns_;
389  if (T::branchType_ == InLumi) {
390  workerManager = &workerManagerLumisAndEvents_;
391  }
392  CMS_SA_ALLOW try {
393  preScheduleSignal<T>(&streamContext_);
394  workerManager->resetAll();
395  } catch (...) {
396  // Just remember the exception at this point,
397  // let the destructor of h call doneWaiting() so the
398  // ServiceRegistry::Operator object is destroyed first
399  h.presetTaskAsFailed(std::current_exception());
400  return;
401  }
402 
404  });
405 
406  if (streamID_.value() == 0) {
407  //Enqueueing will start another thread if there is only
408  // one thread in the job. Having stream == 0 use spawn
409  // avoids starting up another thread when there is only one stream.
410  group->run([task]() {
411  TaskSentry s{task};
412  task->execute();
413  });
414  } else {
415  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
416  arena.enqueue([task]() {
417  TaskSentry s{task};
418  task->execute();
419  });
420  }
421  }
422 
423  template <typename T>
424  void StreamSchedule::preScheduleSignal(StreamContext const* streamContext) const {
425  try {
426  convertException::wrap([this, streamContext]() { T::preScheduleSignal(actReg_.get(), streamContext); });
427  } catch (cms::Exception& ex) {
428  std::ostringstream ost;
429  ex.addContext("Handling pre signal, likely in a service function");
430  exceptionContext(ost, *streamContext);
431  ex.addContext(ost.str());
432  throw;
433  }
434  }
435 
436  template <typename T>
438  std::exception_ptr& excpt) const noexcept {
439  try {
440  convertException::wrap([this, streamContext]() { T::postScheduleSignal(actReg_.get(), streamContext); });
441  } catch (cms::Exception& ex) {
442  if (not excpt) {
443  std::ostringstream ost;
444  ex.addContext("Handling post signal, likely in a service function");
445  exceptionContext(ost, *streamContext);
446  ex.addContext(ost.str());
447  excpt = std::current_exception();
448  }
449  }
450  }
451 } // namespace edm
452 
453 #endif
void initializeEarlyDelete(ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, std::multimap< std::string, std::string > const &referencesToBranches, std::vector< std::string > const &modulesToSkip, edm::ProductRegistry const &preg)
AllWorkers const & allWorkersBeginEnd() const
returns the collection of pointers to workers
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
ExceptionToActionTable const & actionTable() const
returns the action table
std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
std::vector< Worker * > Workers
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::vector< int > empty_trig_paths_
roAction_t actions[nactions]
Definition: GenABIO.cc:181
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
T w() const
std::shared_ptr< HLTGlobalStatus > TrigResPtr
int totalEventsPassed() const
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
static std::mutex mutex
Definition: Proxy.cc:8
void preScheduleSignal(StreamContext const *) const
void endStream(ExceptionCollector &collector, std::mutex &collectorMutex) noexcept
void processOneEventAsync(WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
std::vector< BranchToCount > earlyDeleteBranchToCount_
unsigned int numberOfUnscheduledModules() const
void addToAllWorkers(Worker *w)
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration const > processConfiguration, StreamID streamID, ProcessContext const *processContext)
AllWorkers const & unscheduledWorkers() const
Definition: WorkerManager.h:83
std::vector< Worker * > tryToPlaceConditionalModules(Worker *, std::unordered_set< std::string > &conditionalModules, std::unordered_multimap< std::string, edm::BranchDescription const *> const &conditionalModuleBranches, std::unordered_multimap< std::string, AliasInfo > const &aliasMap, ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration)
edm::propagate_const< WorkerPtr > results_inserter_
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr< Worker > WorkerPtr
StreamID streamID() const
WorkerManager workerManagerBeginEnd_
unsigned int number_of_unscheduled_modules_
int totalEventsFailed() const
std::shared_ptr< ActivityRegistry > actReg_
oneapi::tbb::task_group * group() const noexcept
std::vector< WorkerInPath > PathWorkers
std::vector< int > empty_end_paths_
WorkerManager workerManagerRuns_
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const *> &descriptions, unsigned int hint) const
TrigResConstPtr results() const
void processOneOccurrenceAsync(WaitingTaskHolder, typename T::TransitionInfoType &, ServiceToken const &, StreamID, typename T::Context const *topContext, U const *context) noexcept
StreamContext streamContext_
std::vector< std::string > vstring
AllWorkers const & unscheduledWorkersLumisAndEvents() const
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:82
void finishedPaths(std::atomic< std::exception_ptr *> &, WaitingTaskHolder, EventTransitionInfo &)
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
std::vector< Worker * > AllWorkers
rep
Definition: cuy.py:1189
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
void reportSkipped(EventPrincipal const &ep) const
ServiceToken lock() const
Definition: ServiceToken.h:101
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
void doneWaiting(std::exception_ptr iExcept) noexcept
int totalEvents() const
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:87
StreamContext const & context() const
void clearCounters()
Clear all the counters in the trigger report.
void getTriggerReport(TriggerReport &rep) const
TrigResPtr & results()
void processOneStreamAsync(WaitingTaskHolder iTask, typename T::TransitionInfoType &transitionInfo, ServiceToken const &token, bool cleaningUpAfterException=false)
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
edm::propagate_const< TrigResPtr > results_
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
void handleException(StreamContext const &, bool cleaningUpAfterException, std::exception_ptr &) const noexcept
AllWorkers const & allWorkersLumisAndEvents() const
void addContext(std::string const &context)
Definition: Exception.cc:169
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void postScheduleSignal(StreamContext const *, std::exception_ptr &) const noexcept
HLT enums.
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
unsigned int value() const
Definition: StreamID.h:43
auto wrap(F iFunc) -> decltype(iFunc())
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames, ConditionalTaskHelper const &conditionalTaskHelper, std::unordered_set< std::string > &allConditionalModules)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
long double T
WorkerManager workerManagerLumisAndEvents_
std::vector< Path > TrigPaths
AllWorkers const & allWorkersRuns() const
def move(src, dest)
Definition: eostools.py:511
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_