CMS 3D CMS Logo

StreamSchedule.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_StreamSchedule_h
2 #define FWCore_Framework_StreamSchedule_h
3 
4 /*
5  Author: Jim Kowalkowski 28-01-06
6 
7  A class for creating a schedule based on paths in the configuration file.
8  The schedule is maintained as a sequence of paths.
9  After construction, events can be fed to the object and passed through
10  all the modules in the schedule. All accounting about processing
11  of events by modules and paths is contained here or in object held
12  by containment.
13 
14  The trigger results producer and product are generated and managed here.
15  This class also manages endpaths and calls to endjob and beginjob.
16  Endpaths are just treated as a simple list of modules that need to
17  do processing of the event and do not participate in trigger path
18  activities.
19 
20  This class requires the high-level process pset. It uses @process_name.
21  If the high-level pset contains an "options" pset, then the
22  following optional parameter can be present:
23  bool wantSummary = true/false # default false
24 
25  wantSummary indicates whether or not the pass/fail/error stats
26  for modules and paths should be printed at the end-of-job.
27 
28  A TriggerResults object will always be inserted into the event
29  for any schedule. The producer of the TriggerResults EDProduct
30  is always the first module in the endpath. The TriggerResultInserter
31  is given a fixed label of "TriggerResults".
32 
33  Processing of an event happens by pushing the event through the Paths.
34  The scheduler performs the reset() on each of the workers independent
35  of the Path objects.
36 
37  ------------------------
38 
39  About Paths:
40  Paths fit into two categories:
41  1) trigger paths that contribute directly to saved trigger bits
42  2) end paths
43  The StreamSchedule holds these paths in two data structures:
44  1) main path list
45  2) end path list
46 
47  Trigger path processing always precedes endpath processing.
48  The order of the paths from the input configuration is
49  preserved in the main paths list.
50 
51  ------------------------
52 
53  The StreamSchedule uses the TriggerNamesService to get the names of the
54  trigger paths and end paths. When a TriggerResults object is created
55  the results are stored in the same order as the trigger names from
56  TriggerNamesService.
57 
58 */
59 
88 
89 #include <map>
90 #include <memory>
91 #include <set>
92 #include <string>
93 #include <vector>
94 #include <sstream>
95 #include <atomic>
96 
97 namespace edm {
98 
99  class ActivityRegistry;
100  class BranchIDListHelper;
101  class EventSetupImpl;
102  class ExceptionCollector;
103  class ExceptionToActionTable;
104  class OutputModuleCommunicator;
105  class ProcessContext;
107  class WorkerInPath;
108  class ModuleRegistry;
109  class TriggerResultInserter;
110  class PathStatusInserter;
111  class EndPathStatusInserter;
112  class PreallocationConfiguration;
113  class WaitingTaskHolder;
114 
115  namespace service {
116  class TriggerNamesService;
117  }
118 
119  namespace {
120  template <typename T>
121  class StreamScheduleSignalSentry {
122  public:
123  StreamScheduleSignalSentry(ActivityRegistry* a, typename T::Context const* context)
124  : a_(a), context_(context), allowThrow_(false) {
125  if (a_)
126  T::preScheduleSignal(a_, context_);
127  }
128  ~StreamScheduleSignalSentry() noexcept(false) {
129  try {
130  if (a_) {
131  T::postScheduleSignal(a_, context_);
132  }
133  } catch (...) {
134  if (allowThrow_) {
135  throw;
136  }
137  }
138  }
139 
140  void allowThrow() { allowThrow_ = true; }
141 
142  private:
143  // We own none of these resources.
144  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
145  typename T::Context const* context_;
146  bool allowThrow_;
147  };
148  } // namespace
149 
151  public:
152  typedef std::vector<std::string> vstring;
153  typedef std::vector<Path> TrigPaths;
154  typedef std::vector<Path> NonTrigPaths;
155  typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
156  typedef std::shared_ptr<HLTGlobalStatus const> TrigResConstPtr;
157  typedef std::shared_ptr<Worker> WorkerPtr;
158  typedef std::vector<Worker*> AllWorkers;
159 
160  typedef std::vector<Worker*> Workers;
161 
162  typedef std::vector<WorkerInPath> PathWorkers;
163 
164  StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
165  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
166  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
167  std::shared_ptr<ModuleRegistry>,
168  ParameterSet& proc_pset,
169  service::TriggerNamesService const& tns,
170  PreallocationConfiguration const& prealloc,
171  ProductRegistry& pregistry,
172  BranchIDListHelper& branchIDListHelper,
174  std::shared_ptr<ActivityRegistry> areg,
175  std::shared_ptr<ProcessConfiguration> processConfiguration,
176  bool allowEarlyDelete,
177  StreamID streamID,
178  ProcessContext const* processContext);
179 
180  StreamSchedule(StreamSchedule const&) = delete;
181 
182  void processOneEventAsync(
183  WaitingTaskHolder iTask,
184  EventPrincipal& ep,
185  EventSetupImpl const& es,
186  ServiceToken const& token,
187  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters);
188 
189  template <typename T>
190  void processOneStreamAsync(WaitingTaskHolder iTask,
191  typename T::MyPrincipal& principal,
192  EventSetupImpl const& eventSetup,
193  ServiceToken const& token,
194  bool cleaningUpAfterException = false);
195 
196  void beginStream();
197  void endStream();
198 
199  StreamID streamID() const { return streamID_; }
200 
203 
207  std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
208 
210  void availablePaths(std::vector<std::string>& oLabelsToFill) const;
211 
213  void modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const;
214 
215  void moduleDescriptionsInPath(std::string const& iPathLabel,
216  std::vector<ModuleDescription const*>& descriptions,
217  unsigned int hint) const;
218 
219  void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
220  std::vector<ModuleDescription const*>& descriptions,
221  unsigned int hint) const;
222 
226  int totalEvents() const { return total_events_; }
227 
230  int totalEventsPassed() const { return total_passed_; }
231 
234  int totalEventsFailed() const { return totalEvents() - totalEventsPassed(); }
235 
238  void enableEndPaths(bool active);
239 
242  bool endPathsEnabled() const;
243 
246  void getTriggerReport(TriggerReport& rep) const;
247 
249  void clearCounters();
250 
252  void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
253 
255  AllWorkers const& allWorkers() const { return workerManager_.allWorkers(); }
256 
257  unsigned int numberOfUnscheduledModules() const { return number_of_unscheduled_modules_; }
258 
259  StreamContext const& context() const { return streamContext_; }
260 
261  private:
262  //Sentry class to only send a signal if an
263  // exception occurs. An exception is identified
264  // by the destructor being called without first
265  // calling completedSuccessfully().
267  public:
269  : reg_(iReg), context_(iContext) {}
271  if (reg_) {
272  reg_->preStreamEarlyTerminationSignal_(*context_, TerminationOrigin::ExceptionFromThisContext);
273  }
274  }
275  void completedSuccessfully() { reg_ = nullptr; }
276 
277  private:
278  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
280  };
281 
283  ExceptionToActionTable const& actionTable() const { return workerManager_.actionTable(); }
284 
285  void resetAll();
286 
287  void finishedPaths(std::atomic<std::exception_ptr*>&,
289  EventPrincipal& ep,
290  EventSetupImpl const& es);
291  std::exception_ptr finishProcessOneEvent(std::exception_ptr);
292 
293  void reportSkipped(EventPrincipal const& ep) const;
294 
295  void fillWorkers(ParameterSet& proc_pset,
296  ProductRegistry& preg,
297  PreallocationConfiguration const* prealloc,
298  std::shared_ptr<ProcessConfiguration const> processConfiguration,
299  std::string const& name,
300  bool ignoreFilters,
301  PathWorkers& out,
302  std::vector<std::string> const& endPathNames);
303  void fillTrigPath(ParameterSet& proc_pset,
304  ProductRegistry& preg,
305  PreallocationConfiguration const* prealloc,
306  std::shared_ptr<ProcessConfiguration const> processConfiguration,
307  int bitpos,
308  std::string const& name,
309  TrigResPtr,
310  std::vector<std::string> const& endPathNames);
311  void fillEndPath(ParameterSet& proc_pset,
312  ProductRegistry& preg,
313  PreallocationConfiguration const* prealloc,
314  std::shared_ptr<ProcessConfiguration const> processConfiguration,
315  int bitpos,
316  std::string const& name,
317  std::vector<std::string> const& endPathNames);
318 
319  void addToAllWorkers(Worker* w);
320 
321  void resetEarlyDelete();
322  void initializeEarlyDelete(ModuleRegistry& modReg,
323  edm::ParameterSet const& opts,
324  edm::ProductRegistry const& preg,
325  bool allowEarlyDelete);
326 
327  TrigResConstPtr results() const { return get_underlying_safe(results_); }
328  TrigResPtr& results() { return get_underlying_safe(results_); }
329 
330  void makePathStatusInserters(
331  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
332  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
334 
336  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
337 
339 
341  std::vector<edm::propagate_const<WorkerPtr>> pathStatusInserterWorkers_;
342  std::vector<edm::propagate_const<WorkerPtr>> endPathStatusInserterWorkers_;
343 
344  TrigPaths trig_paths_;
345  TrigPaths end_paths_;
346  std::vector<int> empty_trig_paths_;
347  std::vector<int> empty_end_paths_;
348 
349  //For each branch that has been marked for early deletion
350  // keep track of how many modules are left that read this data but have
351  // not yet been run in this event
352  std::vector<BranchToCount> earlyDeleteBranchToCount_;
353  //NOTE the following is effectively internal data for each EarlyDeleteHelper
354  // but putting it into one vector makes for better allocation as well as
355  // faster iteration when used to reset the earlyDeleteBranchToCount_
356  // Each EarlyDeleteHelper hold a begin and end range into this vector. The values
357  // of this vector correspond to indexes into earlyDeleteBranchToCount_ so
358  // tell which EarlyDeleteHelper is associated with which BranchIDs.
359  std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
360  //There is one EarlyDeleteHelper per Module which are reading data that
361  // has been marked for early deletion
362  std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
363 
367 
370  volatile bool endpathsAreActive_;
371  std::atomic<bool> skippingEvent_;
372  };
373 
374  void inline StreamSchedule::reportSkipped(EventPrincipal const& ep) const {
375  Service<JobReport> reportSvc;
376  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
377  }
378 
379  template <typename T>
381  typename T::MyPrincipal& ep,
382  EventSetupImpl const& es,
383  ServiceToken const& token,
384  bool cleaningUpAfterException) {
385  T::setStreamContext(streamContext_, ep);
386 
387  auto id = ep.id();
388  auto doneTask = make_waiting_task(
389  tbb::task::allocate_root(),
390  [this, iHolder, id, cleaningUpAfterException, token](std::exception_ptr const* iPtr) mutable {
391  std::exception_ptr excpt;
392  if (iPtr) {
393  excpt = *iPtr;
394  //add context information to the exception and print message
395  try {
396  convertException::wrap([&]() { std::rethrow_exception(excpt); });
397  } catch (cms::Exception& ex) {
398  //TODO: should add the transition type info
399  std::ostringstream ost;
400  if (ex.context().empty()) {
401  ost << "Processing " << T::transitionName() << " " << id;
402  }
403  ServiceRegistry::Operate op(token);
404  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
405  excpt = std::current_exception();
406  }
407 
408  ServiceRegistry::Operate op(token);
409  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
410  }
411 
412  try {
413  ServiceRegistry::Operate op(token);
414  T::postScheduleSignal(actReg_.get(), &streamContext_);
415  } catch (...) {
416  if (not excpt) {
417  excpt = std::current_exception();
418  }
419  }
420  iHolder.doneWaiting(excpt);
421  });
422 
423  auto task = make_functor_task(tbb::task::allocate_root(),
424  [this, doneTask, h = WaitingTaskHolder(doneTask), &ep, &es, token]() mutable {
425  ServiceRegistry::Operate op(token);
426  try {
427  T::preScheduleSignal(actReg_.get(), &streamContext_);
428 
429  workerManager_.resetAll();
430  } catch (...) {
431  h.doneWaiting(std::current_exception());
432  return;
433  }
434 
435  for (auto& p : end_paths_) {
436  p.runAllModulesAsync<T>(doneTask, ep, es, token, streamID_, &streamContext_);
437  }
438 
439  for (auto& p : trig_paths_) {
440  p.runAllModulesAsync<T>(doneTask, ep, es, token, streamID_, &streamContext_);
441  }
442 
443  workerManager_.processOneOccurrenceAsync<T>(
444  doneTask, ep, es, token, streamID_, &streamContext_, &streamContext_);
445  });
446 
447  if (streamID_.value() == 0) {
448  //Enqueueing will start another thread if there is only
449  // one thread in the job. Having stream == 0 use spawn
450  // avoids starting up another thread when there is only one stream.
451  tbb::task::spawn(*task);
452  } else {
453  tbb::task::enqueue(*task);
454  }
455  }
456 } // namespace edm
457 
458 #endif
RunNumber_t run() const
Definition: EventID.h:39
EventNumber_t event() const
Definition: EventID.h:41
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
std::vector< Worker * > Workers
std::vector< int > empty_trig_paths_
roAction_t actions[nactions]
Definition: GenABIO.cc:181
void processOneStreamAsync(WaitingTaskHolder iTask, typename T::MyPrincipal &principal, EventSetupImpl const &eventSetup, ServiceToken const &token, bool cleaningUpAfterException=false)
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
const double w
Definition: UKUtility.cc:23
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
int totalEventsFailed() const
unsigned int numberOfUnscheduledModules() const
std::vector< Path > NonTrigPaths
std::shared_ptr< HLTGlobalStatus > TrigResPtr
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
volatile bool endpathsAreActive_
std::vector< BranchToCount > earlyDeleteBranchToCount_
EventID const & id() const
int totalEvents() const
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
void reportSkipped(EventPrincipal const &ep) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
std::shared_ptr< Worker > WorkerPtr
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
int totalEventsPassed() const
std::vector< int > empty_end_paths_
def principal(options)
void doneWaiting(std::exception_ptr iExcept)
StreamID streamID() const
SendTerminationSignalIfException(edm::ActivityRegistry *iReg, edm::StreamContext const *iContext)
StreamContext streamContext_
std::vector< std::string > vstring
std::list< std::string > const & context() const
Definition: Exception.cc:147
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::vector< Worker * > AllWorkers
rep
Definition: cuy.py:1190
#define noexcept
TrigResPtr & results()
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
edm::propagate_const< TrigResPtr > results_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
std::atomic< bool > skippingEvent_
HLT enums.
double a
Definition: hdecay.h:121
StreamContext const & context() const
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
auto wrap(F iFunc) -> decltype(iFunc())
long double T
TrigResConstPtr results() const
std::vector< Path > TrigPaths
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_