CMS 3D CMS Logo

StreamSchedule.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_StreamSchedule_h
2 #define FWCore_Framework_StreamSchedule_h
3 
4 /*
5  Author: Jim Kowalkowski 28-01-06
6 
7  A class for creating a schedule based on paths in the configuration file.
8  The schedule is maintained as a sequence of paths.
9  After construction, events can be fed to the object and passed through
10  all the modules in the schedule. All accounting about processing
11  of events by modules and paths is contained here or in object held
12  by containment.
13 
14  The trigger results producer and product are generated and managed here.
15  This class also manages endpaths and calls to endjob and beginjob.
16  Endpaths are just treated as a simple list of modules that need to
17  do processing of the event and do not participate in trigger path
18  activities.
19 
20  This class requires the high-level process pset. It uses @process_name.
21  If the high-level pset contains an "options" pset, then the
22  following optional parameter can be present:
23  bool wantSummary = true/false # default false
24 
25  wantSummary indicates whether or not the pass/fail/error stats
26  for modules and paths should be printed at the end-of-job.
27 
28  A TriggerResults object will always be inserted into the event
29  for any schedule. The producer of the TriggerResults EDProduct
30  is always the first module in the endpath. The TriggerResultInserter
31  is given a fixed label of "TriggerResults".
32 
33  Processing of an event happens by pushing the event through the Paths.
34  The scheduler performs the reset() on each of the workers independent
35  of the Path objects.
36 
37  ------------------------
38 
39  About Paths:
40  Paths fit into two categories:
41  1) trigger paths that contribute directly to saved trigger bits
42  2) end paths
43  The StreamSchedule holds these paths in two data structures:
44  1) main path list
45  2) end path list
46 
47  Trigger path processing always precedes endpath processing.
48  The order of the paths from the input configuration is
49  preserved in the main paths list.
50 
51  ------------------------
52 
53  The StreamSchedule uses the TriggerNamesService to get the names of the
54  trigger paths and end paths. When a TriggerResults object is created
55  the results are stored in the same order as the trigger names from
56  TriggerNamesService.
57 
58 */
59 
88 
89 #include <map>
90 #include <memory>
91 #include <set>
92 #include <string>
93 #include <vector>
94 #include <sstream>
95 #include <atomic>
96 
97 namespace edm {
98 
99  class ActivityRegistry;
100  class BranchIDListHelper;
101  class EventSetup;
102  class ExceptionCollector;
103  class ExceptionToActionTable;
104  class OutputModuleCommunicator;
105  class ProcessContext;
107  class WorkerInPath;
108  class ModuleRegistry;
109  class TriggerResultInserter;
110  class PathStatusInserter;
111  class EndPathStatusInserter;
112  class PreallocationConfiguration;
113  class WaitingTaskHolder;
114 
115  namespace service {
116  class TriggerNamesService;
117  }
118 
119  namespace {
120  template <typename T>
121  class StreamScheduleSignalSentry {
122  public:
123  StreamScheduleSignalSentry(ActivityRegistry* a, typename T::Context const* context) :
124  a_(a), context_(context), allowThrow_(false) {
125  if (a_) T::preScheduleSignal(a_, context_);
126  }
127  ~StreamScheduleSignalSentry() noexcept(false) {
128  try {
129  if (a_) { T::postScheduleSignal(a_, context_); }
130  } catch(...) {
131  if(allowThrow_) {throw;}
132  }
133  }
134 
135  void allowThrow() {
136  allowThrow_ = true;
137  }
138 
139  private:
140  // We own none of these resources.
141  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
142  typename T::Context const* context_;
143  bool allowThrow_;
144  };
145  }
146 
148  public:
149  typedef std::vector<std::string> vstring;
150  typedef std::vector<Path> TrigPaths;
151  typedef std::vector<Path> NonTrigPaths;
152  typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
153  typedef std::shared_ptr<HLTGlobalStatus const> TrigResConstPtr;
154  typedef std::shared_ptr<Worker> WorkerPtr;
155  typedef std::vector<Worker*> AllWorkers;
156 
157  typedef std::vector<Worker*> Workers;
158 
159  typedef std::vector<WorkerInPath> PathWorkers;
160 
161  StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
162  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
163  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
164  std::shared_ptr<ModuleRegistry>,
165  ParameterSet& proc_pset,
166  service::TriggerNamesService const& tns,
167  PreallocationConfiguration const& prealloc,
168  ProductRegistry& pregistry,
169  BranchIDListHelper& branchIDListHelper,
171  std::shared_ptr<ActivityRegistry> areg,
172  std::shared_ptr<ProcessConfiguration> processConfiguration,
173  bool allowEarlyDelete,
174  StreamID streamID,
175  ProcessContext const* processContext);
176 
177  StreamSchedule(StreamSchedule const&) = delete;
178 
179  void processOneEventAsync(WaitingTaskHolder iTask,
180  EventPrincipal& ep,
181  EventSetup const& es,
182  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters);
183 
184  template <typename T>
185  void processOneStreamAsync(WaitingTaskHolder iTask,
186  typename T::MyPrincipal& principal,
187  EventSetup const& eventSetup,
188  bool cleaningUpAfterException = false);
189 
190  void beginStream();
191  void endStream();
192 
193  StreamID streamID() const { return streamID_; }
194 
197 
201  std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
202 
204  void availablePaths(std::vector<std::string>& oLabelsToFill) const;
205 
207  void modulesInPath(std::string const& iPathLabel,
208  std::vector<std::string>& oLabelsToFill) const;
209 
210  void moduleDescriptionsInPath(std::string const& iPathLabel,
211  std::vector<ModuleDescription const*>& descriptions,
212  unsigned int hint) const;
213 
214  void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
215  std::vector<ModuleDescription const*>& descriptions,
216  unsigned int hint) const;
217 
221  int totalEvents() const {
222  return total_events_;
223  }
224 
227  int totalEventsPassed() const {
228  return total_passed_;
229  }
230 
233  int totalEventsFailed() const {
234  return totalEvents() - totalEventsPassed();
235  }
236 
239  void enableEndPaths(bool active);
240 
243  bool endPathsEnabled() const;
244 
247  void getTriggerReport(TriggerReport& rep) const;
248 
250  void clearCounters();
251 
253  void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
254 
256  AllWorkers const& allWorkers() const {
257  return workerManager_.allWorkers();
258  }
259 
260  unsigned int numberOfUnscheduledModules() const {
261  return number_of_unscheduled_modules_;
262  }
263 
264  StreamContext const& context() const { return streamContext_;}
265  private:
266  //Sentry class to only send a signal if an
267  // exception occurs. An exception is identified
268  // by the destructor being called without first
269  // calling completedSuccessfully().
271  public:
273  reg_(iReg),
274  context_(iContext){}
276  if(reg_) {
277  reg_->preStreamEarlyTerminationSignal_(*context_,TerminationOrigin::ExceptionFromThisContext);
278  }
279  }
281  reg_ = nullptr;
282  }
283  private:
284  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
286  };
287 
290  return workerManager_.actionTable();
291  }
292 
293 
294  void resetAll();
295 
296  void finishedPaths(std::exception_ptr, WaitingTaskHolder,
297  EventPrincipal& ep, EventSetup const& es);
298  std::exception_ptr finishProcessOneEvent(std::exception_ptr);
299 
300  void reportSkipped(EventPrincipal const& ep) const;
301 
302  void fillWorkers(ParameterSet& proc_pset,
303  ProductRegistry& preg,
304  PreallocationConfiguration const* prealloc,
305  std::shared_ptr<ProcessConfiguration const> processConfiguration,
306  std::string const& name, bool ignoreFilters, PathWorkers& out,
307  std::vector<std::string> const& endPathNames);
308  void fillTrigPath(ParameterSet& proc_pset,
309  ProductRegistry& preg,
310  PreallocationConfiguration const* prealloc,
311  std::shared_ptr<ProcessConfiguration const> processConfiguration,
312  int bitpos, std::string const& name, TrigResPtr,
313  std::vector<std::string> const& endPathNames);
314  void fillEndPath(ParameterSet& proc_pset,
315  ProductRegistry& preg,
316  PreallocationConfiguration const* prealloc,
317  std::shared_ptr<ProcessConfiguration const> processConfiguration,
318  int bitpos, std::string const& name,
319  std::vector<std::string> const& endPathNames);
320 
321  void addToAllWorkers(Worker* w);
322 
323  void resetEarlyDelete();
324  void initializeEarlyDelete(ModuleRegistry & modReg,
325  edm::ParameterSet const& opts,
326  edm::ProductRegistry const& preg,
327  bool allowEarlyDelete);
328 
329  TrigResConstPtr results() const {return get_underlying_safe(results_);}
330  TrigResPtr& results() {return get_underlying_safe(results_);}
331 
332  void makePathStatusInserters(
333  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
334  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
336 
338  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
339 
341 
343  std::vector<edm::propagate_const<WorkerPtr>> pathStatusInserterWorkers_;
344  std::vector<edm::propagate_const<WorkerPtr>> endPathStatusInserterWorkers_;
345 
346  TrigPaths trig_paths_;
347  TrigPaths end_paths_;
348  std::vector<int> empty_trig_paths_;
349  std::vector<int> empty_end_paths_;
350 
351  //For each branch that has been marked for early deletion
352  // keep track of how many modules are left that read this data but have
353  // not yet been run in this event
354  std::vector<BranchToCount> earlyDeleteBranchToCount_;
355  //NOTE the following is effectively internal data for each EarlyDeleteHelper
356  // but putting it into one vector makes for better allocation as well as
357  // faster iteration when used to reset the earlyDeleteBranchToCount_
358  // Each EarlyDeleteHelper hold a begin and end range into this vector. The values
359  // of this vector correspond to indexes into earlyDeleteBranchToCount_ so
360  // tell which EarlyDeleteHelper is associated with which BranchIDs.
361  std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
362  //There is one EarlyDeleteHelper per Module which are reading data that
363  // has been marked for early deletion
364  std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
365 
369 
372  volatile bool endpathsAreActive_;
373  std::atomic<bool> skippingEvent_;
374  };
375 
376  void
377  inline
379  Service<JobReport> reportSvc;
380  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
381  }
382 
383  template <typename T>
385  typename T::MyPrincipal& ep,
386  EventSetup const& es,
387  bool cleaningUpAfterException) {
389 
390  T::setStreamContext(streamContext_, ep);
391 
392  auto id = ep.id();
393  auto doneTask = make_waiting_task(tbb::task::allocate_root(),
394  [this,iHolder, id,cleaningUpAfterException,token](std::exception_ptr const* iPtr) mutable
395  {
396  ServiceRegistry::Operate op(token);
397  std::exception_ptr excpt;
398  if(iPtr) {
399  excpt = *iPtr;
400  //add context information to the exception and print message
401  try {
402  convertException::wrap([&]() {
403  std::rethrow_exception(excpt);
404  });
405  } catch(cms::Exception& ex) {
406  //TODO: should add the transition type info
407  std::ostringstream ost;
408  if(ex.context().empty()) {
409  ost<<"Processing "<<T::transitionName()<<" "<<id;
410  }
411  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
412  excpt = std::current_exception();
413  }
414 
415  actReg_->preStreamEarlyTerminationSignal_(streamContext_,TerminationOrigin::ExceptionFromThisContext);
416  }
417 
418  try {
419  T::postScheduleSignal(actReg_.get(), &streamContext_);
420  } catch(...) {
421  if(not excpt) {
422  excpt = std::current_exception();
423  }
424  }
425  iHolder.doneWaiting(excpt);
426 
427  });
428 
429  auto task = make_functor_task(tbb::task::allocate_root(), [this,doneTask,&ep,&es,token] () mutable {
430  ServiceRegistry::Operate op(token);
431  T::preScheduleSignal(actReg_.get(), &streamContext_);
432  WaitingTaskHolder h(doneTask);
433 
434  workerManager_.resetAll();
435  for(auto& p : end_paths_) {
436  p.runAllModulesAsync<T>(doneTask, ep, es, streamID_, &streamContext_);
437  }
438 
439  for(auto& p : trig_paths_) {
440  p.runAllModulesAsync<T>(doneTask, ep, es, streamID_, &streamContext_);
441  }
442 
443  workerManager_.processOneOccurrenceAsync<T>(doneTask,
444  ep, es, streamID_, &streamContext_, &streamContext_);
445  });
446 
447  if(streamID_.value() == 0) {
448  //Enqueueing will start another thread if there is only
449  // one thread in the job. Having stream == 0 use spawn
450  // avoids starting up another thread when there is only one stream.
451  tbb::task::spawn( *task);
452  } else {
453  tbb::task::enqueue( *task);
454  }
455  }
456 }
457 
458 #endif
RunNumber_t run() const
Definition: EventID.h:39
EventNumber_t event() const
Definition: EventID.h:41
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
std::vector< Worker * > Workers
std::vector< int > empty_trig_paths_
roAction_t actions[nactions]
Definition: GenABIO.cc:187
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
const double w
Definition: UKUtility.cc:23
FWCore Framework interface EventSetupRecordImplementation h
Helper function to determine trigger accepts.
int totalEventsFailed() const
unsigned int numberOfUnscheduledModules() const
std::vector< Path > NonTrigPaths
std::shared_ptr< HLTGlobalStatus > TrigResPtr
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
volatile bool endpathsAreActive_
std::vector< BranchToCount > earlyDeleteBranchToCount_
EventID const & id() const
int totalEvents() const
#define noexcept
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
void reportSkipped(EventPrincipal const &ep) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
std::shared_ptr< Worker > WorkerPtr
ServiceToken presentToken() const
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
int totalEventsPassed() const
std::vector< int > empty_end_paths_
def principal(options)
void doneWaiting(std::exception_ptr iExcept)
StreamID streamID() const
SendTerminationSignalIfException(edm::ActivityRegistry *iReg, edm::StreamContext const *iContext)
StreamContext streamContext_
std::vector< std::string > vstring
std::list< std::string > const & context() const
Definition: Exception.cc:191
static ServiceRegistry & instance()
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::vector< Worker * > AllWorkers
rep
Definition: cuy.py:1188
TrigResPtr & results()
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
edm::propagate_const< TrigResPtr > results_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
void processOneStreamAsync(WaitingTaskHolder iTask, typename T::MyPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
std::atomic< bool > skippingEvent_
HLT enums.
double a
Definition: hdecay.h:121
StreamContext const & context() const
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
auto wrap(F iFunc) -> decltype(iFunc())
long double T
TrigResConstPtr results() const
std::vector< Path > TrigPaths
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_