CMS 3D CMS Logo

StreamSchedule.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_StreamSchedule_h
2 #define FWCore_Framework_StreamSchedule_h
3 
4 /*
5  Author: Jim Kowalkowski 28-01-06
6 
7  A class for creating a schedule based on paths in the configuration file.
8  The schedule is maintained as a sequence of paths.
9  After construction, events can be fed to the object and passed through
10  all the modules in the schedule. All accounting about processing
11  of events by modules and paths is contained here or in object held
12  by containment.
13 
14  The trigger results producer and product are generated and managed here.
15  This class also manages endpaths and calls to endjob and beginjob.
16  Endpaths are just treated as a simple list of modules that need to
17  do processing of the event and do not participate in trigger path
18  activities.
19 
20  This class requires the high-level process pset. It uses @process_name.
21  If the high-level pset contains an "options" pset, then the
22  following optional parameter can be present:
23  bool wantSummary = true/false # default false
24 
25  wantSummary indicates whether or not the pass/fail/error stats
26  for modules and paths should be printed at the end-of-job.
27 
28  A TriggerResults object will always be inserted into the event
29  for any schedule. The producer of the TriggerResults EDProduct
30  is always the first module in the endpath. The TriggerResultInserter
31  is given a fixed label of "TriggerResults".
32 
33  Processing of an event happens by pushing the event through the Paths.
34  The scheduler performs the reset() on each of the workers independent
35  of the Path objects.
36 
37  ------------------------
38 
39  About Paths:
40  Paths fit into two categories:
41  1) trigger paths that contribute directly to saved trigger bits
42  2) end paths
43  The StreamSchedule holds these paths in two data structures:
44  1) main path list
45  2) end path list
46 
47  Trigger path processing always precedes endpath processing.
48  The order of the paths from the input configuration is
49  preserved in the main paths list.
50 
51  ------------------------
52 
53  The StreamSchedule uses the TriggerNamesService to get the names of the
54  trigger paths and end paths. When a TriggerResults object is created
55  the results are stored in the same order as the trigger names from
56  TriggerNamesService.
57 
58 */
59 
88 
89 #include <map>
90 #include <memory>
91 #include <set>
92 #include <string>
93 #include <vector>
94 #include <sstream>
95 #include <atomic>
96 
97 namespace edm {
98 
99  class ActivityRegistry;
100  class BranchIDListHelper;
101  class EventSetup;
102  class ExceptionCollector;
103  class ExceptionToActionTable;
104  class OutputModuleCommunicator;
105  class ProcessContext;
107  class WorkerInPath;
108  class ModuleRegistry;
109  class TriggerResultInserter;
110  class PathStatusInserter;
111  class EndPathStatusInserter;
112  class PreallocationConfiguration;
113  class WaitingTaskHolder;
114 
115  namespace service {
116  class TriggerNamesService;
117  }
118 
119  namespace {
120  template <typename T>
121  class StreamScheduleSignalSentry {
122  public:
123  StreamScheduleSignalSentry(ActivityRegistry* a, typename T::Context const* context) :
124  a_(a), context_(context), allowThrow_(false) {
125  if (a_) T::preScheduleSignal(a_, context_);
126  }
127  ~StreamScheduleSignalSentry() noexcept(false) {
128  try {
129  if (a_) { T::postScheduleSignal(a_, context_); }
130  } catch(...) {
131  if(allowThrow_) {throw;}
132  }
133  }
134 
135  void allowThrow() {
136  allowThrow_ = true;
137  }
138 
139  private:
140  // We own none of these resources.
141  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
142  typename T::Context const* context_;
143  bool allowThrow_;
144  };
145  }
146 
148  public:
149  typedef std::vector<std::string> vstring;
150  typedef std::vector<Path> TrigPaths;
151  typedef std::vector<Path> NonTrigPaths;
152  typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
153  typedef std::shared_ptr<HLTGlobalStatus const> TrigResConstPtr;
154  typedef std::shared_ptr<Worker> WorkerPtr;
155  typedef std::vector<Worker*> AllWorkers;
156 
157  typedef std::vector<Worker*> Workers;
158 
159  typedef std::vector<WorkerInPath> PathWorkers;
160 
161  StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
162  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
163  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
164  std::shared_ptr<ModuleRegistry>,
165  ParameterSet& proc_pset,
166  service::TriggerNamesService const& tns,
167  PreallocationConfiguration const& prealloc,
168  ProductRegistry& pregistry,
169  BranchIDListHelper& branchIDListHelper,
171  std::shared_ptr<ActivityRegistry> areg,
172  std::shared_ptr<ProcessConfiguration> processConfiguration,
173  bool allowEarlyDelete,
174  StreamID streamID,
175  ProcessContext const* processContext);
176 
177  StreamSchedule(StreamSchedule const&) = delete;
178 
179  void processOneEventAsync(WaitingTaskHolder iTask,
180  EventPrincipal& ep,
181  EventSetup const& es,
182  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters);
183 
184  template <typename T>
185  void processOneStream(typename T::MyPrincipal& principal,
186  EventSetup const& eventSetup,
187  bool cleaningUpAfterException = false);
188 
189  template <typename T>
190  void processOneStreamAsync(WaitingTaskHolder iTask,
191  typename T::MyPrincipal& principal,
192  EventSetup const& eventSetup,
193  bool cleaningUpAfterException = false);
194 
195  void beginStream();
196  void endStream();
197 
198  StreamID streamID() const { return streamID_; }
199 
202 
206  std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
207 
209  void availablePaths(std::vector<std::string>& oLabelsToFill) const;
210 
212  void modulesInPath(std::string const& iPathLabel,
213  std::vector<std::string>& oLabelsToFill) const;
214 
215  void moduleDescriptionsInPath(std::string const& iPathLabel,
216  std::vector<ModuleDescription const*>& descriptions,
217  unsigned int hint) const;
218 
219  void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
220  std::vector<ModuleDescription const*>& descriptions,
221  unsigned int hint) const;
222 
226  int totalEvents() const {
227  return total_events_;
228  }
229 
232  int totalEventsPassed() const {
233  return total_passed_;
234  }
235 
238  int totalEventsFailed() const {
239  return totalEvents() - totalEventsPassed();
240  }
241 
244  void enableEndPaths(bool active);
245 
248  bool endPathsEnabled() const;
249 
252  void getTriggerReport(TriggerReport& rep) const;
253 
255  void clearCounters();
256 
258  void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
259 
261  AllWorkers const& allWorkers() const {
262  return workerManager_.allWorkers();
263  }
264 
265  unsigned int numberOfUnscheduledModules() const {
266  return number_of_unscheduled_modules_;
267  }
268 
269  StreamContext const& context() const { return streamContext_;}
270  private:
271  //Sentry class to only send a signal if an
272  // exception occurs. An exception is identified
273  // by the destructor being called without first
274  // calling completedSuccessfully().
276  public:
278  reg_(iReg),
279  context_(iContext){}
281  if(reg_) {
282  reg_->preStreamEarlyTerminationSignal_(*context_,TerminationOrigin::ExceptionFromThisContext);
283  }
284  }
286  reg_ = nullptr;
287  }
288  private:
289  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
291  };
292 
295  return workerManager_.actionTable();
296  }
297 
298 
299  void resetAll();
300 
301  void finishedPaths(std::exception_ptr, WaitingTaskHolder,
302  EventPrincipal& ep, EventSetup const& es);
303  std::exception_ptr finishProcessOneEvent(std::exception_ptr);
304 
305  template <typename T>
306  bool runTriggerPaths(typename T::MyPrincipal const&, EventSetup const&, typename T::Context const*);
307 
308  template <typename T>
309  void runEndPaths(typename T::MyPrincipal const&, EventSetup const&, typename T::Context const*);
310 
311  void reportSkipped(EventPrincipal const& ep) const;
312 
313  void fillWorkers(ParameterSet& proc_pset,
314  ProductRegistry& preg,
315  PreallocationConfiguration const* prealloc,
316  std::shared_ptr<ProcessConfiguration const> processConfiguration,
317  std::string const& name, bool ignoreFilters, PathWorkers& out,
318  std::vector<std::string> const& endPathNames);
319  void fillTrigPath(ParameterSet& proc_pset,
320  ProductRegistry& preg,
321  PreallocationConfiguration const* prealloc,
322  std::shared_ptr<ProcessConfiguration const> processConfiguration,
323  int bitpos, std::string const& name, TrigResPtr,
324  std::vector<std::string> const& endPathNames);
325  void fillEndPath(ParameterSet& proc_pset,
326  ProductRegistry& preg,
327  PreallocationConfiguration const* prealloc,
328  std::shared_ptr<ProcessConfiguration const> processConfiguration,
329  int bitpos, std::string const& name,
330  std::vector<std::string> const& endPathNames);
331 
332  void addToAllWorkers(Worker* w);
333 
334  void resetEarlyDelete();
335  void initializeEarlyDelete(ModuleRegistry & modReg,
336  edm::ParameterSet const& opts,
337  edm::ProductRegistry const& preg,
338  bool allowEarlyDelete);
339 
340  TrigResConstPtr results() const {return get_underlying_safe(results_);}
341  TrigResPtr& results() {return get_underlying_safe(results_);}
342 
343  void makePathStatusInserters(
344  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
345  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
347 
349  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
350 
352 
354  std::vector<edm::propagate_const<WorkerPtr>> pathStatusInserterWorkers_;
355  std::vector<edm::propagate_const<WorkerPtr>> endPathStatusInserterWorkers_;
356 
357  TrigPaths trig_paths_;
358  TrigPaths end_paths_;
359  std::vector<int> empty_trig_paths_;
360  std::vector<int> empty_end_paths_;
361 
362  //For each branch that has been marked for early deletion
363  // keep track of how many modules are left that read this data but have
364  // not yet been run in this event
365  std::vector<BranchToCount> earlyDeleteBranchToCount_;
366  //NOTE the following is effectively internal data for each EarlyDeleteHelper
367  // but putting it into one vector makes for better allocation as well as
368  // faster iteration when used to reset the earlyDeleteBranchToCount_
369  // Each EarlyDeleteHelper hold a begin and end range into this vector. The values
370  // of this vector correspond to indexes into earlyDeleteBranchToCount_ so
371  // tell which EarlyDeleteHelper is associated with which BranchIDs.
372  std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
373  //There is one EarlyDeleteHelper per Module which are reading data that
374  // has been marked for early deletion
375  std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
376 
380 
383  volatile bool endpathsAreActive_;
384  std::atomic<bool> skippingEvent_;
385  };
386 
387  void
388  inline
390  Service<JobReport> reportSvc;
391  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
392  }
393 
394  template <typename T>
395  void StreamSchedule::processOneStream(typename T::MyPrincipal& ep,
396  EventSetup const& es,
397  bool cleaningUpAfterException) {
398  this->resetAll();
399 
400  T::setStreamContext(streamContext_, ep);
401  StreamScheduleSignalSentry<T> sentry(actReg_.get(), &streamContext_);
402 
403  SendTerminationSignalIfException terminationSentry(actReg_.get(), &streamContext_);
404 
405  // This call takes care of the unscheduled processing.
406  workerManager_.processOneOccurrence<T>(ep, es, streamID_, &streamContext_, &streamContext_, cleaningUpAfterException);
407 
408  try {
409  convertException::wrap([&]() {
410  runTriggerPaths<T>(ep, es, &streamContext_);
411 
412  if (endpathsAreActive_) runEndPaths<T>(ep, es, &streamContext_);
413  });
414  }
415  catch(cms::Exception& ex) {
416  if (ex.context().empty()) {
417  addContextAndPrintException("Calling function StreamSchedule::processOneStream", ex, cleaningUpAfterException);
418  } else {
419  addContextAndPrintException("", ex, cleaningUpAfterException);
420  }
421  throw;
422  }
423  terminationSentry.completedSuccessfully();
424 
425  //If we got here no other exception has happened so we can propogate any Service related exceptions
426  sentry.allowThrow();
427  }
428 
429  template <typename T>
431  typename T::MyPrincipal& ep,
432  EventSetup const& es,
433  bool cleaningUpAfterException) {
435 
436  T::setStreamContext(streamContext_, ep);
437 
438  auto id = ep.id();
439  auto doneTask = make_waiting_task(tbb::task::allocate_root(),
440  [this,iHolder, id,cleaningUpAfterException,token](std::exception_ptr const* iPtr) mutable
441  {
442  ServiceRegistry::Operate op(token);
443  std::exception_ptr excpt;
444  if(iPtr) {
445  excpt = *iPtr;
446  //add context information to the exception and print message
447  try {
448  convertException::wrap([&]() {
449  std::rethrow_exception(excpt);
450  });
451  } catch(cms::Exception& ex) {
452  //TODO: should add the transition type info
453  std::ostringstream ost;
454  if(ex.context().empty()) {
455  ost<<"Processing "<<T::transitionName()<<" "<<id;
456  }
457  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
458  excpt = std::current_exception();
459  }
460 
461  actReg_->preStreamEarlyTerminationSignal_(streamContext_,TerminationOrigin::ExceptionFromThisContext);
462  }
463 
464  try {
465  T::postScheduleSignal(actReg_.get(), &streamContext_);
466  } catch(...) {
467  if(not excpt) {
468  excpt = std::current_exception();
469  }
470  }
471  iHolder.doneWaiting(excpt);
472 
473  });
474 
475  auto task = make_functor_task(tbb::task::allocate_root(), [this,doneTask,&ep,&es,cleaningUpAfterException,token] () mutable {
476  ServiceRegistry::Operate op(token);
477  T::preScheduleSignal(actReg_.get(), &streamContext_);
478  WaitingTaskHolder h(doneTask);
479 
480  workerManager_.resetAll();
481  for(auto& p : end_paths_) {
482  p.runAllModulesAsync<T>(doneTask, ep, es, streamID_, &streamContext_);
483  }
484 
485  for(auto& p : trig_paths_) {
486  p.runAllModulesAsync<T>(doneTask, ep, es, streamID_, &streamContext_);
487  }
488 
489  workerManager_.processOneOccurrenceAsync<T>(doneTask,
490  ep, es, streamID_, &streamContext_, &streamContext_);
491  });
492 
493  if(streamID_.value() == 0) {
494  //Enqueueing will start another thread if there is only
495  // one thread in the job. Having stream == 0 use spawn
496  // avoids starting up another thread when there is only one stream.
497  tbb::task::spawn( *task);
498  } else {
499  tbb::task::enqueue( *task);
500  }
501  }
502 
503 
504  template <typename T>
505  bool
506  StreamSchedule::runTriggerPaths(typename T::MyPrincipal const& ep, EventSetup const& es, typename T::Context const* context) {
507  for(auto& p : trig_paths_) {
508  p.processOneOccurrence<T>(ep, es, streamID_, context);
509  }
510  return results_->accept();
511  }
512 
513  template <typename T>
514  void
515  StreamSchedule::runEndPaths(typename T::MyPrincipal const& ep, EventSetup const& es, typename T::Context const* context) {
516  // Note there is no state-checking safety controlling the
517  // activation/deactivation of endpaths.
518  for(auto& p : end_paths_) {
519  p.processOneOccurrence<T>(ep, es, streamID_, context);
520  }
521  }
522 }
523 
524 #endif
RunNumber_t run() const
Definition: EventID.h:39
EventNumber_t event() const
Definition: EventID.h:41
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
std::vector< Worker * > Workers
std::vector< int > empty_trig_paths_
roAction_t actions[nactions]
Definition: GenABIO.cc:187
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
const double w
Definition: UKUtility.cc:23
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
int totalEventsFailed() const
unsigned int numberOfUnscheduledModules() const
std::vector< Path > NonTrigPaths
std::shared_ptr< HLTGlobalStatus > TrigResPtr
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
volatile bool endpathsAreActive_
std::vector< BranchToCount > earlyDeleteBranchToCount_
EventID const & id() const
int totalEvents() const
#define noexcept
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
void reportSkipped(EventPrincipal const &ep) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
std::shared_ptr< Worker > WorkerPtr
ServiceToken presentToken() const
void runEndPaths(typename T::MyPrincipal const &, EventSetup const &, typename T::Context const *)
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
int totalEventsPassed() const
std::vector< int > empty_end_paths_
def principal(options)
void doneWaiting(std::exception_ptr iExcept)
StreamID streamID() const
SendTerminationSignalIfException(edm::ActivityRegistry *iReg, edm::StreamContext const *iContext)
StreamContext streamContext_
std::vector< std::string > vstring
std::list< std::string > const & context() const
Definition: Exception.cc:191
static ServiceRegistry & instance()
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::vector< Worker * > AllWorkers
rep
Definition: cuy.py:1188
TrigResPtr & results()
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
edm::propagate_const< TrigResPtr > results_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
void processOneStreamAsync(WaitingTaskHolder iTask, typename T::MyPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
std::atomic< bool > skippingEvent_
HLT enums.
double a
Definition: hdecay.h:121
StreamContext const & context() const
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
auto wrap(F iFunc) -> decltype(iFunc())
bool runTriggerPaths(typename T::MyPrincipal const &, EventSetup const &, typename T::Context const *)
long double T
TrigResConstPtr results() const
std::vector< Path > TrigPaths
void processOneStream(typename T::MyPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_