CMS 3D CMS Logo

StreamSchedule.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_StreamSchedule_h
2 #define FWCore_Framework_StreamSchedule_h
3 
4 /*
5  Author: Jim Kowalkowski 28-01-06
6 
7  A class for creating a schedule based on paths in the configuration file.
8  The schedule is maintained as a sequence of paths.
9  After construction, events can be fed to the object and passed through
10  all the modules in the schedule. All accounting about processing
11  of events by modules and paths is contained here or in object held
12  by containment.
13 
14  The trigger results producer and product are generated and managed here.
15  This class also manages endpaths and calls to endjob and beginjob.
16  Endpaths are just treated as a simple list of modules that need to
17  do processing of the event and do not participate in trigger path
18  activities.
19 
20  This class requires the high-level process pset. It uses @process_name.
21  If the high-level pset contains an "options" pset, then the
22  following optional parameter can be present:
23  bool wantSummary = true/false # default false
24 
25  wantSummary indicates whether or not the pass/fail/error stats
26  for modules and paths should be printed at the end-of-job.
27 
28  A TriggerResults object will always be inserted into the event
29  for any schedule. The producer of the TriggerResults EDProduct
30  is always the first module in the endpath. The TriggerResultInserter
31  is given a fixed label of "TriggerResults".
32 
33  Processing of an event happens by pushing the event through the Paths.
34  The scheduler performs the reset() on each of the workers independent
35  of the Path objects.
36 
37  ------------------------
38 
39  About Paths:
40  Paths fit into two categories:
41  1) trigger paths that contribute directly to saved trigger bits
42  2) end paths
43  The StreamSchedule holds these paths in two data structures:
44  1) main path list
45  2) end path list
46 
47  Trigger path processing always precedes endpath processing.
48  The order of the paths from the input configuration is
49  preserved in the main paths list.
50 
51  ------------------------
52 
53  The StreamSchedule uses the TriggerNamesService to get the names of the
54  trigger paths and end paths. When a TriggerResults object is created
55  the results are stored in the same order as the trigger names from
56  TriggerNamesService.
57 
58 */
59 
87 
88 #include <map>
89 #include <memory>
90 #include <set>
91 #include <string>
92 #include <vector>
93 #include <sstream>
94 #include <atomic>
95 
96 namespace edm {
97 
98  class ActivityRegistry;
99  class BranchIDListHelper;
100  class EventSetup;
101  class ExceptionCollector;
102  class OutputModuleCommunicator;
103  class ProcessContext;
105  class WorkerInPath;
106  class ModuleRegistry;
107  class TriggerResultInserter;
108  class PreallocationConfiguration;
109  class WaitingTaskHolder;
110 
111  namespace service {
112  class TriggerNamesService;
113  }
114 
115  namespace {
116  template <typename T>
117  class StreamScheduleSignalSentry {
118  public:
119  StreamScheduleSignalSentry(ActivityRegistry* a, typename T::Context const* context) :
120  a_(a), context_(context), allowThrow_(false) {
121  if (a_) T::preScheduleSignal(a_, context_);
122  }
123  ~StreamScheduleSignalSentry() noexcept(false) {
124  try {
125  if (a_) { T::postScheduleSignal(a_, context_); }
126  } catch(...) {
127  if(allowThrow_) {throw;}
128  }
129  }
130 
131  void allowThrow() {
132  allowThrow_ = true;
133  }
134 
135  private:
136  // We own none of these resources.
137  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
138  typename T::Context const* context_;
139  bool allowThrow_;
140  };
141  }
142 
144  public:
145  typedef std::vector<std::string> vstring;
146  typedef std::vector<Path> TrigPaths;
147  typedef std::vector<Path> NonTrigPaths;
148  typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
149  typedef std::shared_ptr<HLTGlobalStatus const> TrigResConstPtr;
150  typedef std::shared_ptr<Worker> WorkerPtr;
151  typedef std::vector<Worker*> AllWorkers;
152 
153  typedef std::vector<Worker*> Workers;
154 
155  typedef std::vector<WorkerInPath> PathWorkers;
156 
157  StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
158  std::shared_ptr<ModuleRegistry>,
159  ParameterSet& proc_pset,
161  PreallocationConfiguration const& prealloc,
162  ProductRegistry& pregistry,
163  BranchIDListHelper& branchIDListHelper,
165  std::shared_ptr<ActivityRegistry> areg,
166  std::shared_ptr<ProcessConfiguration> processConfiguration,
167  bool allowEarlyDelete,
168  StreamID streamID,
169  ProcessContext const* processContext);
170 
171  StreamSchedule(StreamSchedule const&) = delete;
172 
173  void processOneEventAsync(WaitingTaskHolder iTask,
174  EventPrincipal& ep,
175  EventSetup const& es);
176 
177  template <typename T>
178  void processOneStream(typename T::MyPrincipal& principal,
179  EventSetup const& eventSetup,
180  bool cleaningUpAfterException = false);
181 
182  template <typename T>
183  void processOneStreamAsync(WaitingTaskHolder iTask,
184  typename T::MyPrincipal& principal,
185  EventSetup const& eventSetup,
186  bool cleaningUpAfterException = false);
187 
188  void beginStream();
189  void endStream();
190 
191  StreamID streamID() const { return streamID_; }
192 
195 
199  std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
200 
202  void availablePaths(std::vector<std::string>& oLabelsToFill) const;
203 
207  void triggerPaths(std::vector<std::string>& oLabelsToFill) const;
208 
210  void endPaths(std::vector<std::string>& oLabelsToFill) const;
211 
213  void modulesInPath(std::string const& iPathLabel,
214  std::vector<std::string>& oLabelsToFill) const;
215 
216  void moduleDescriptionsInPath(std::string const& iPathLabel,
217  std::vector<ModuleDescription const*>& descriptions,
218  unsigned int hint) const;
219 
220  void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
221  std::vector<ModuleDescription const*>& descriptions,
222  unsigned int hint) const;
223 
227  int totalEvents() const {
228  return total_events_;
229  }
230 
233  int totalEventsPassed() const {
234  return total_passed_;
235  }
236 
239  int totalEventsFailed() const {
240  return totalEvents() - totalEventsPassed();
241  }
242 
245  void enableEndPaths(bool active);
246 
249  bool endPathsEnabled() const;
250 
253  void getTriggerReport(TriggerReport& rep) const;
254 
256  void clearCounters();
257 
259  void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
260 
262  AllWorkers const& allWorkers() const {
263  return workerManager_.allWorkers();
264  }
265 
266  unsigned int numberOfUnscheduledModules() const {
267  return number_of_unscheduled_modules_;
268  }
269 
270  StreamContext const& context() const { return streamContext_;}
271  private:
272  //Sentry class to only send a signal if an
273  // exception occurs. An exception is identified
274  // by the destructor being called without first
275  // calling completedSuccessfully().
277  public:
279  reg_(iReg),
280  context_(iContext){}
282  if(reg_) {
283  reg_->preStreamEarlyTerminationSignal_(*context_,TerminationOrigin::ExceptionFromThisContext);
284  }
285  }
287  reg_ = nullptr;
288  }
289  private:
290  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
292  };
293 
296  return workerManager_.actionTable();
297  }
298 
299 
300  void resetAll();
301 
302  void finishedPaths(std::exception_ptr, WaitingTaskHolder,
303  EventPrincipal& ep, EventSetup const& es);
304  std::exception_ptr finishProcessOneEvent(std::exception_ptr);
305 
306  template <typename T>
307  bool runTriggerPaths(typename T::MyPrincipal const&, EventSetup const&, typename T::Context const*);
308 
309  template <typename T>
310  void runEndPaths(typename T::MyPrincipal const&, EventSetup const&, typename T::Context const*);
311 
312  void reportSkipped(EventPrincipal const& ep) const;
313 
314  void fillWorkers(ParameterSet& proc_pset,
315  ProductRegistry& preg,
316  PreallocationConfiguration const* prealloc,
317  std::shared_ptr<ProcessConfiguration const> processConfiguration,
318  std::string const& name, bool ignoreFilters, PathWorkers& out,
319  vstring* labelsOnPaths);
320  void fillTrigPath(ParameterSet& proc_pset,
321  ProductRegistry& preg,
322  PreallocationConfiguration const* prealloc,
323  std::shared_ptr<ProcessConfiguration const> processConfiguration,
324  int bitpos, std::string const& name, TrigResPtr,
325  vstring* labelsOnTriggerPaths);
326  void fillEndPath(ParameterSet& proc_pset,
327  ProductRegistry& preg,
328  PreallocationConfiguration const* prealloc,
329  std::shared_ptr<ProcessConfiguration const> processConfiguration,
330  int bitpos, std::string const& name);
331 
332  void addToAllWorkers(Worker* w);
333 
334  void resetEarlyDelete();
335  void initializeEarlyDelete(ModuleRegistry & modReg,
336  edm::ParameterSet const& opts,
337  edm::ProductRegistry const& preg,
338  bool allowEarlyDelete);
339 
340  TrigResConstPtr results() const {return get_underlying_safe(results_);}
341  TrigResPtr& results() {return get_underlying_safe(results_);}
342 
344  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
345 
348 
350 
352  TrigPaths trig_paths_;
353  TrigPaths end_paths_;
354  std::vector<int> empty_trig_paths_;
356 
357  //For each branch that has been marked for early deletion
358  // keep track of how many modules are left that read this data but have
359  // not yet been run in this event
360  std::vector<BranchToCount> earlyDeleteBranchToCount_;
361  //NOTE the following is effectively internal data for each EarlyDeleteHelper
362  // but putting it into one vector makes for better allocation as well as
363  // faster iteration when used to reset the earlyDeleteBranchToCount_
364  // Each EarlyDeleteHelper hold a begin and end range into this vector. The values
365  // of this vector correspond to indexes into earlyDeleteBranchToCount_ so
366  // tell which EarlyDeleteHelper is associated with which BranchIDs.
367  std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
368  //There is one EarlyDeleteHelper per Module which are reading data that
369  // has been marked for early deletion
370  std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
371 
375 
378  volatile bool endpathsAreActive_;
379  std::atomic<bool> skippingEvent_;
380  };
381 
382  void
383  inline
385  Service<JobReport> reportSvc;
386  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
387  }
388 
389  template <typename T>
390  void StreamSchedule::processOneStream(typename T::MyPrincipal& ep,
391  EventSetup const& es,
392  bool cleaningUpAfterException) {
393  this->resetAll();
394 
395  T::setStreamContext(streamContext_, ep);
396  StreamScheduleSignalSentry<T> sentry(actReg_.get(), &streamContext_);
397 
398  SendTerminationSignalIfException terminationSentry(actReg_.get(), &streamContext_);
399 
400  // This call takes care of the unscheduled processing.
401  workerManager_.processOneOccurrence<T>(ep, es, streamID_, &streamContext_, &streamContext_, cleaningUpAfterException);
402 
403  try {
404  convertException::wrap([&]() {
405  runTriggerPaths<T>(ep, es, &streamContext_);
406 
407  if (endpathsAreActive_) runEndPaths<T>(ep, es, &streamContext_);
408  });
409  }
410  catch(cms::Exception& ex) {
411  if (ex.context().empty()) {
412  addContextAndPrintException("Calling function StreamSchedule::processOneStream", ex, cleaningUpAfterException);
413  } else {
414  addContextAndPrintException("", ex, cleaningUpAfterException);
415  }
416  throw;
417  }
418  terminationSentry.completedSuccessfully();
419 
420  //If we got here no other exception has happened so we can propogate any Service related exceptions
421  sentry.allowThrow();
422  }
423 
424  template <typename T>
426  typename T::MyPrincipal& ep,
427  EventSetup const& es,
428  bool cleaningUpAfterException) {
430 
431  T::setStreamContext(streamContext_, ep);
432 
433  auto id = ep.id();
434  auto doneTask = make_waiting_task(tbb::task::allocate_root(),
435  [this,iHolder, id,cleaningUpAfterException,token](std::exception_ptr const* iPtr) mutable
436  {
437  ServiceRegistry::Operate op(token);
438  std::exception_ptr excpt;
439  if(iPtr) {
440  excpt = *iPtr;
441  //add context information to the exception and print message
442  try {
443  convertException::wrap([&]() {
444  std::rethrow_exception(excpt);
445  });
446  } catch(cms::Exception& ex) {
447  //TODO: should add the transition type info
448  std::ostringstream ost;
449  if(ex.context().empty()) {
450  ost<<"Processing "<<T::transitionName()<<" "<<id;
451  }
452  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
453  excpt = std::current_exception();
454  }
455 
456  actReg_->preStreamEarlyTerminationSignal_(streamContext_,TerminationOrigin::ExceptionFromThisContext);
457  }
458 
459  try {
460  T::postScheduleSignal(actReg_.get(), &streamContext_);
461  } catch(...) {
462  if(not excpt) {
463  excpt = std::current_exception();
464  }
465  }
466  iHolder.doneWaiting(excpt);
467 
468  });
469 
470  auto task = make_functor_task(tbb::task::allocate_root(), [this,doneTask,&ep,&es,cleaningUpAfterException,token] () mutable {
471  ServiceRegistry::Operate op(token);
472  T::preScheduleSignal(actReg_.get(), &streamContext_);
473  WaitingTaskHolder h(doneTask);
474 
475  workerManager_.resetAll();
476  for(auto& p : end_paths_) {
477  p.runAllModulesAsync<T>(doneTask, ep, es, streamID_, &streamContext_);
478  }
479 
480  for(auto& p : trig_paths_) {
481  p.runAllModulesAsync<T>(doneTask, ep, es, streamID_, &streamContext_);
482  }
483 
484  workerManager_.processOneOccurrenceAsync<T>(doneTask,
485  ep, es, streamID_, &streamContext_, &streamContext_);
486  });
487 
488  if(streamID_.value() == 0) {
489  //Enqueueing will start another thread if there is only
490  // one thread in the job. Having stream == 0 use spawn
491  // avoids starting up another thread when there is only one stream.
492  tbb::task::spawn( *task);
493  } else {
494  tbb::task::enqueue( *task);
495  }
496  }
497 
498 
499  template <typename T>
500  bool
501  StreamSchedule::runTriggerPaths(typename T::MyPrincipal const& ep, EventSetup const& es, typename T::Context const* context) {
502  for(auto& p : trig_paths_) {
503  p.processOneOccurrence<T>(ep, es, streamID_, context);
504  }
505  return results_->accept();
506  }
507 
508  template <typename T>
509  void
510  StreamSchedule::runEndPaths(typename T::MyPrincipal const& ep, EventSetup const& es, typename T::Context const* context) {
511  // Note there is no state-checking safety controlling the
512  // activation/deactivation of endpaths.
513  for(auto& p : end_paths_) {
514  p.processOneOccurrence<T>(ep, es, streamID_, context);
515  }
516  }
517 }
518 
519 #endif
RunNumber_t run() const
Definition: EventID.h:39
EventNumber_t event() const
Definition: EventID.h:41
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
std::vector< Worker * > Workers
std::vector< int > empty_trig_paths_
roAction_t actions[nactions]
Definition: GenABIO.cc:187
const double w
Definition: UKUtility.cc:23
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
int totalEventsFailed() const
unsigned int numberOfUnscheduledModules() const
std::vector< Path > NonTrigPaths
std::shared_ptr< HLTGlobalStatus > TrigResPtr
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
volatile bool endpathsAreActive_
std::vector< BranchToCount > earlyDeleteBranchToCount_
EventID const & id() const
int totalEvents() const
#define noexcept
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
void reportSkipped(EventPrincipal const &ep) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
std::shared_ptr< Worker > WorkerPtr
ServiceToken presentToken() const
void runEndPaths(typename T::MyPrincipal const &, EventSetup const &, typename T::Context const *)
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
int totalEventsPassed() const
def principal(options)
void doneWaiting(std::exception_ptr iExcept)
StreamID streamID() const
SendTerminationSignalIfException(edm::ActivityRegistry *iReg, edm::StreamContext const *iContext)
StreamContext streamContext_
std::vector< std::string > vstring
std::list< std::string > const & context() const
Definition: Exception.cc:191
static ServiceRegistry & instance()
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::vector< Worker * > AllWorkers
rep
Definition: cuy.py:1188
vstring empty_trig_path_names_
TrigResPtr & results()
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
edm::propagate_const< TrigResPtr > results_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
void processOneStreamAsync(WaitingTaskHolder iTask, typename T::MyPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
std::atomic< bool > skippingEvent_
HLT enums.
double a
Definition: hdecay.h:121
StreamContext const & context() const
auto wrap(F iFunc) -> decltype(iFunc())
bool runTriggerPaths(typename T::MyPrincipal const &, EventSetup const &, typename T::Context const *)
long double T
static std::string const triggerPaths
Definition: EdmProvDump.cc:42
TrigResConstPtr results() const
std::vector< Path > TrigPaths
void processOneStream(typename T::MyPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_