CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
StreamSchedule.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_StreamSchedule_h
2 #define FWCore_Framework_StreamSchedule_h
3 
4 /*
5  Author: Jim Kowalkowski 28-01-06
6 
7  A class for creating a schedule based on paths in the configuration file.
8  The schedule is maintained as a sequence of paths.
9  After construction, events can be fed to the object and passed through
10  all the modules in the schedule. All accounting about processing
11  of events by modules and paths is contained here or in object held
12  by containment.
13 
14  The trigger results producer and product are generated and managed here.
15  This class also manages endpaths and calls to endjob and beginjob.
16  Endpaths are just treated as a simple list of modules that need to
17  do processing of the event and do not participate in trigger path
18  activities.
19 
20  This class requires the high-level process pset. It uses @process_name.
21  If the high-level pset contains an "options" pset, then the
22  following optional parameter can be present:
23  bool wantSummary = true/false # default false
24 
25  wantSummary indicates whether or not the pass/fail/error stats
26  for modules and paths should be printed at the end-of-job.
27 
28  A TriggerResults object will always be inserted into the event
29  for any schedule. The producer of the TriggerResults EDProduct
30  is always the first module in the endpath. The TriggerResultInserter
31  is given a fixed label of "TriggerResults".
32 
33  Processing of an event happens by pushing the event through the Paths.
34  The scheduler performs the reset() on each of the workers independent
35  of the Path objects.
36 
37  ------------------------
38 
39  About Paths:
40  Paths fit into two categories:
41  1) trigger paths that contribute directly to saved trigger bits
42  2) end paths
43  The StreamSchedule holds these paths in two data structures:
44  1) main path list
45  2) end path list
46 
47  Trigger path processing always precedes endpath processing.
48  The order of the paths from the input configuration is
49  preserved in the main paths list.
50 
51  ------------------------
52 
53  The StreamSchedule uses the TriggerNamesService to get the names of the
54  trigger paths and end paths. When a TriggerResults object is created
55  the results are stored in the same order as the trigger names from
56  TriggerNamesService.
57 
58 */
59 
90 
91 #include <map>
92 #include <memory>
93 #include <set>
94 #include <string>
95 #include <vector>
96 #include <sstream>
97 #include <atomic>
98 
99 namespace edm {
100 
101  class ActivityRegistry;
102  class BranchIDListHelper;
103  class ExceptionCollector;
104  class ExceptionToActionTable;
105  class OutputModuleCommunicator;
106  class ProcessContext;
108  class WorkerInPath;
109  class ModuleRegistry;
110  class TriggerResultInserter;
111  class PathStatusInserter;
112  class EndPathStatusInserter;
113  class PreallocationConfiguration;
114  class WaitingTaskHolder;
115 
116  namespace service {
117  class TriggerNamesService;
118  }
119 
120  namespace {
121  template <typename T>
122  class StreamScheduleSignalSentry {
123  public:
124  StreamScheduleSignalSentry(ActivityRegistry* a, typename T::Context const* context)
125  : a_(a), context_(context), allowThrow_(false) {
126  if (a_)
127  T::preScheduleSignal(a_, context_);
128  }
129  ~StreamScheduleSignalSentry() noexcept(false) {
130  // Caught exception is rethrown (when allowed)
131  CMS_SA_ALLOW try {
132  if (a_) {
133  T::postScheduleSignal(a_, context_);
134  }
135  } catch (...) {
136  if (allowThrow_) {
137  throw;
138  }
139  }
140  }
141 
142  void allowThrow() { allowThrow_ = true; }
143 
144  private:
145  // We own none of these resources.
146  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
147  typename T::Context const* context_;
148  bool allowThrow_;
149  };
150  } // namespace
151 
153  public:
154  typedef std::vector<std::string> vstring;
155  typedef std::vector<Path> TrigPaths;
156  typedef std::vector<Path> NonTrigPaths;
157  typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
158  typedef std::shared_ptr<HLTGlobalStatus const> TrigResConstPtr;
159  typedef std::shared_ptr<Worker> WorkerPtr;
160  typedef std::vector<Worker*> AllWorkers;
161 
162  typedef std::vector<Worker*> Workers;
163 
164  typedef std::vector<WorkerInPath> PathWorkers;
165 
166  StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
167  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
168  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
169  std::shared_ptr<ModuleRegistry>,
170  ParameterSet& proc_pset,
173  ProductRegistry& pregistry,
174  BranchIDListHelper& branchIDListHelper,
176  std::shared_ptr<ActivityRegistry> areg,
177  std::shared_ptr<ProcessConfiguration> processConfiguration,
179  ProcessContext const* processContext);
180 
181  StreamSchedule(StreamSchedule const&) = delete;
182 
184  WaitingTaskHolder iTask,
186  ServiceToken const& token,
187  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters);
188 
189  template <typename T>
191  typename T::TransitionInfoType& transitionInfo,
192  ServiceToken const& token,
193  bool cleaningUpAfterException = false);
194 
195  void beginStream();
196  void endStream();
197 
198  StreamID streamID() const { return streamID_; }
199 
202 
206  std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
207 
209  void availablePaths(std::vector<std::string>& oLabelsToFill) const;
210 
212  void modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const;
213 
214  void moduleDescriptionsInPath(std::string const& iPathLabel,
215  std::vector<ModuleDescription const*>& descriptions,
216  unsigned int hint) const;
217 
218  void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
219  std::vector<ModuleDescription const*>& descriptions,
220  unsigned int hint) const;
221 
225  int totalEvents() const { return total_events_; }
226 
229  int totalEventsPassed() const { return total_passed_; }
230 
233  int totalEventsFailed() const { return totalEvents() - totalEventsPassed(); }
234 
237  void getTriggerReport(TriggerReport& rep) const;
238 
240  void clearCounters();
241 
243  void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
244 
246  void deleteModule(std::string const& iLabel);
247 
249  std::vector<std::string> const& branchesToDeleteEarly,
250  edm::ProductRegistry const& preg);
251 
253  AllWorkers const& allWorkers() const { return workerManager_.allWorkers(); }
254 
256 
257  StreamContext const& context() const { return streamContext_; }
258 
259  private:
260  //Sentry class to only send a signal if an
261  // exception occurs. An exception is identified
262  // by the destructor being called without first
263  // calling completedSuccessfully().
265  public:
267  : reg_(iReg), context_(iContext) {}
269  if (reg_) {
271  }
272  }
273  void completedSuccessfully() { reg_ = nullptr; }
274 
275  private:
276  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
278  };
279 
282 
283  void resetAll();
284 
285  void finishedPaths(std::atomic<std::exception_ptr*>&, WaitingTaskHolder, EventTransitionInfo&);
286 
287  std::exception_ptr finishProcessOneEvent(std::exception_ptr);
288 
289  void reportSkipped(EventPrincipal const& ep) const;
290 
291  void fillWorkers(ParameterSet& proc_pset,
294  std::shared_ptr<ProcessConfiguration const> processConfiguration,
295  std::string const& name,
296  bool ignoreFilters,
297  PathWorkers& out,
298  std::vector<std::string> const& endPathNames);
299  void fillTrigPath(ParameterSet& proc_pset,
302  std::shared_ptr<ProcessConfiguration const> processConfiguration,
303  int bitpos,
304  std::string const& name,
305  TrigResPtr,
306  std::vector<std::string> const& endPathNames);
307  void fillEndPath(ParameterSet& proc_pset,
310  std::shared_ptr<ProcessConfiguration const> processConfiguration,
311  int bitpos,
312  std::string const& name,
313  std::vector<std::string> const& endPathNames);
314 
315  void addToAllWorkers(Worker* w);
316 
317  void resetEarlyDelete();
318 
321 
323  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
324  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
326 
328  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
329 
331 
333  std::vector<edm::propagate_const<WorkerPtr>> pathStatusInserterWorkers_;
334  std::vector<edm::propagate_const<WorkerPtr>> endPathStatusInserterWorkers_;
335 
338  std::vector<int> empty_trig_paths_;
339  std::vector<int> empty_end_paths_;
340 
341  //For each branch that has been marked for early deletion
342  // keep track of how many modules are left that read this data but have
343  // not yet been run in this event
344  std::vector<BranchToCount> earlyDeleteBranchToCount_;
345  //NOTE the following is effectively internal data for each EarlyDeleteHelper
346  // but putting it into one vector makes for better allocation as well as
347  // faster iteration when used to reset the earlyDeleteBranchToCount_
348  // Each EarlyDeleteHelper hold a begin and end range into this vector. The values
349  // of this vector correspond to indexes into earlyDeleteBranchToCount_ so
350  // tell which EarlyDeleteHelper is associated with which BranchIDs.
351  std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
352  //There is one EarlyDeleteHelper per Module which are reading data that
353  // has been marked for early deletion
354  std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
355 
359 
362  std::atomic<bool> skippingEvent_;
363  };
364 
365  void inline StreamSchedule::reportSkipped(EventPrincipal const& ep) const {
366  Service<JobReport> reportSvc;
367  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
368  }
369 
370  template <typename T>
372  typename T::TransitionInfoType& transitionInfo,
373  ServiceToken const& token,
374  bool cleaningUpAfterException) {
375  auto const& principal = transitionInfo.principal();
376  T::setStreamContext(streamContext_, principal);
377 
378  auto id = principal.id();
379  ServiceWeakToken weakToken = token;
380  auto doneTask = make_waiting_task(
381  [this, iHolder, id, cleaningUpAfterException, weakToken](std::exception_ptr const* iPtr) mutable {
382  std::exception_ptr excpt;
383  if (iPtr) {
384  excpt = *iPtr;
385  //add context information to the exception and print message
386  try {
387  convertException::wrap([&]() { std::rethrow_exception(excpt); });
388  } catch (cms::Exception& ex) {
389  //TODO: should add the transition type info
390  std::ostringstream ost;
391  if (ex.context().empty()) {
392  ost << "Processing " << T::transitionName() << " " << id;
393  }
394  ServiceRegistry::Operate op(weakToken.lock());
395  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
396  excpt = std::current_exception();
397  }
398 
399  ServiceRegistry::Operate op(weakToken.lock());
400  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
401  }
402  // Caught exception is propagated via WaitingTaskHolder
403  CMS_SA_ALLOW try {
404  ServiceRegistry::Operate op(weakToken.lock());
405  T::postScheduleSignal(actReg_.get(), &streamContext_);
406  } catch (...) {
407  if (not excpt) {
408  excpt = std::current_exception();
409  }
410  }
411  iHolder.doneWaiting(excpt);
412  });
413 
414  auto task = make_functor_task(
415  [this, h = WaitingTaskHolder(*iHolder.group(), doneTask), info = transitionInfo, weakToken]() mutable {
416  auto token = weakToken.lock();
417  ServiceRegistry::Operate op(token);
418  // Caught exception is propagated via WaitingTaskHolder
419  CMS_SA_ALLOW try {
420  T::preScheduleSignal(actReg_.get(), &streamContext_);
421 
423  } catch (...) {
424  h.doneWaiting(std::current_exception());
425  return;
426  }
427 
428  for (auto& p : end_paths_) {
429  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
430  }
431 
432  for (auto& p : trig_paths_) {
433  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
434  }
435 
437  });
438 
439  if (streamID_.value() == 0) {
440  //Enqueueing will start another thread if there is only
441  // one thread in the job. Having stream == 0 use spawn
442  // avoids starting up another thread when there is only one stream.
443  iHolder.group()->run([task]() {
444  TaskSentry s{task};
445  task->execute();
446  });
447  } else {
448  oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
449  arena.enqueue([task]() {
450  TaskSentry s{task};
451  task->execute();
452  });
453  }
454  }
455 } // namespace edm
456 
457 #endif
RunNumber_t run() const
Definition: EventID.h:38
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
EventNumber_t event() const
Definition: EventID.h:40
ServiceToken lock() const
Definition: ServiceToken.h:101
string rep
Definition: cuy.py:1189
pathNames_ & tns()), endPathNames_(&tns.getEndPaths()), wantSummary_(tns.wantSummary()
Definition: Schedule.cc:691
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
std::vector< Worker * > Workers
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::vector< int > empty_trig_paths_
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
uint16_t *__restrict__ id
int totalEventsFailed() const
unsigned int numberOfUnscheduledModules() const
std::vector< Path > NonTrigPaths
std::shared_ptr< HLTGlobalStatus > TrigResPtr
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void processOneEventAsync(WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
std::vector< BranchToCount > earlyDeleteBranchToCount_
EventID const & id() const
int totalEvents() const
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
processConfiguration
Definition: Schedule.cc:687
void addToAllWorkers(Worker *w)
void reportSkipped(EventPrincipal const &ep) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr< Worker > WorkerPtr
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
actions
Definition: Schedule.cc:687
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
void finishedPaths(std::atomic< std::exception_ptr * > &, WaitingTaskHolder, EventTransitionInfo &)
void processOneOccurrenceAsync(WaitingTaskHolder, typename T::TransitionInfoType &, ServiceToken const &, StreamID, typename T::Context const *topContext, U const *context)
oneapi::tbb::task_group * group() const noexcept
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
std::vector< WorkerInPath > PathWorkers
int totalEventsPassed() const
std::vector< int > empty_end_paths_
void getTriggerReport(TriggerReport &rep) const
void doneWaiting(std::exception_ptr iExcept)
StreamID streamID() const
PreStreamEarlyTermination preStreamEarlyTerminationSignal_
SendTerminationSignalIfException(edm::ActivityRegistry *iReg, edm::StreamContext const *iContext)
StreamContext streamContext_
std::vector< std::string > vstring
std::list< std::string > const & context() const
Definition: Exception.cc:147
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, StreamID streamID, ProcessContext const *processContext)
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
std::vector< Worker * > AllWorkers
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
areg
Definition: Schedule.cc:687
void clearCounters()
Clear all the counters in the trigger report.
void initializeEarlyDelete(ModuleRegistry &modReg, std::vector< std::string > const &branchesToDeleteEarly, edm::ProductRegistry const &preg)
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:85
TrigResPtr & results()
unsigned int value() const
Definition: StreamID.h:43
void processOneStreamAsync(WaitingTaskHolder iTask, typename T::TransitionInfoType &transitionInfo, ServiceToken const &token, bool cleaningUpAfterException=false)
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
edm::propagate_const< TrigResPtr > results_
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
std::atomic< bool > skippingEvent_
double a
Definition: hdecay.h:119
StreamContext const & context() const
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
T w() const
auto wrap(F iFunc) -> decltype(iFunc())
preg
Definition: Schedule.cc:687
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
long double T
TrigResConstPtr results() const
std::vector< Path > TrigPaths
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:89
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
prealloc
Definition: Schedule.cc:687