CMS 3D CMS Logo

StreamSchedule.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_StreamSchedule_h
2 #define FWCore_Framework_StreamSchedule_h
3 
4 /*
5  Author: Jim Kowalkowski 28-01-06
6 
7  A class for creating a schedule based on paths in the configuration file.
8  The schedule is maintained as a sequence of paths.
9  After construction, events can be fed to the object and passed through
10  all the modules in the schedule. All accounting about processing
11  of events by modules and paths is contained here or in object held
12  by containment.
13 
14  The trigger results producer and product are generated and managed here.
15  This class also manages endpaths and calls to endjob and beginjob.
16  Endpaths are just treated as a simple list of modules that need to
17  do processing of the event and do not participate in trigger path
18  activities.
19 
20  This class requires the high-level process pset. It uses @process_name.
21  If the high-level pset contains an "options" pset, then the
22  following optional parameter can be present:
23  bool wantSummary = true/false # default false
24 
25  wantSummary indicates whether or not the pass/fail/error stats
26  for modules and paths should be printed at the end-of-job.
27 
28  A TriggerResults object will always be inserted into the event
29  for any schedule. The producer of the TriggerResults EDProduct
30  is always the first module in the endpath. The TriggerResultInserter
31  is given a fixed label of "TriggerResults".
32 
33  Processing of an event happens by pushing the event through the Paths.
34  The scheduler performs the reset() on each of the workers independent
35  of the Path objects.
36 
37  ------------------------
38 
39  About Paths:
40  Paths fit into two categories:
41  1) trigger paths that contribute directly to saved trigger bits
42  2) end paths
43  The StreamSchedule holds these paths in two data structures:
44  1) main path list
45  2) end path list
46 
47  Trigger path processing always precedes endpath processing.
48  The order of the paths from the input configuration is
49  preserved in the main paths list.
50 
51  ------------------------
52 
53  The StreamSchedule uses the TriggerNamesService to get the names of the
54  trigger paths and end paths. When a TriggerResults object is created
55  the results are stored in the same order as the trigger names from
56  TriggerNamesService.
57 
58 */
59 
88 
89 #include <map>
90 #include <memory>
91 #include <set>
92 #include <string>
93 #include <vector>
94 #include <sstream>
95 #include <atomic>
96 
97 namespace edm {
98 
99  class ActivityRegistry;
100  class BranchIDListHelper;
101  class EventSetup;
102  class ExceptionCollector;
103  class ExceptionToActionTable;
104  class OutputModuleCommunicator;
105  class ProcessContext;
107  class WorkerInPath;
108  class ModuleRegistry;
109  class TriggerResultInserter;
110  class PathStatusInserter;
111  class EndPathStatusInserter;
112  class PreallocationConfiguration;
113  class WaitingTaskHolder;
114 
115  namespace service {
116  class TriggerNamesService;
117  }
118 
119  namespace {
120  template <typename T>
121  class StreamScheduleSignalSentry {
122  public:
123  StreamScheduleSignalSentry(ActivityRegistry* a, typename T::Context const* context) :
124  a_(a), context_(context), allowThrow_(false) {
125  if (a_) T::preScheduleSignal(a_, context_);
126  }
127  ~StreamScheduleSignalSentry() noexcept(false) {
128  try {
129  if (a_) { T::postScheduleSignal(a_, context_); }
130  } catch(...) {
131  if(allowThrow_) {throw;}
132  }
133  }
134 
135  void allowThrow() {
136  allowThrow_ = true;
137  }
138 
139  private:
140  // We own none of these resources.
141  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
142  typename T::Context const* context_;
143  bool allowThrow_;
144  };
145  }
146 
148  public:
149  typedef std::vector<std::string> vstring;
150  typedef std::vector<Path> TrigPaths;
151  typedef std::vector<Path> NonTrigPaths;
152  typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
153  typedef std::shared_ptr<HLTGlobalStatus const> TrigResConstPtr;
154  typedef std::shared_ptr<Worker> WorkerPtr;
155  typedef std::vector<Worker*> AllWorkers;
156 
157  typedef std::vector<Worker*> Workers;
158 
159  typedef std::vector<WorkerInPath> PathWorkers;
160 
161  StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
162  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
163  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
164  std::shared_ptr<ModuleRegistry>,
165  ParameterSet& proc_pset,
166  service::TriggerNamesService const& tns,
167  PreallocationConfiguration const& prealloc,
168  ProductRegistry& pregistry,
169  BranchIDListHelper& branchIDListHelper,
171  std::shared_ptr<ActivityRegistry> areg,
172  std::shared_ptr<ProcessConfiguration> processConfiguration,
173  bool allowEarlyDelete,
174  StreamID streamID,
175  ProcessContext const* processContext);
176 
177  StreamSchedule(StreamSchedule const&) = delete;
178 
179  void processOneEventAsync(WaitingTaskHolder iTask,
180  EventPrincipal& ep,
181  EventSetup const& es,
182  ServiceToken const& token,
183  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters);
184 
185  template <typename T>
186  void processOneStreamAsync(WaitingTaskHolder iTask,
187  typename T::MyPrincipal& principal,
188  EventSetup const& eventSetup,
189  ServiceToken const& token,
190  bool cleaningUpAfterException = false);
191 
192  void beginStream();
193  void endStream();
194 
195  StreamID streamID() const { return streamID_; }
196 
199 
203  std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
204 
206  void availablePaths(std::vector<std::string>& oLabelsToFill) const;
207 
209  void modulesInPath(std::string const& iPathLabel,
210  std::vector<std::string>& oLabelsToFill) const;
211 
212  void moduleDescriptionsInPath(std::string const& iPathLabel,
213  std::vector<ModuleDescription const*>& descriptions,
214  unsigned int hint) const;
215 
216  void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
217  std::vector<ModuleDescription const*>& descriptions,
218  unsigned int hint) const;
219 
223  int totalEvents() const {
224  return total_events_;
225  }
226 
229  int totalEventsPassed() const {
230  return total_passed_;
231  }
232 
235  int totalEventsFailed() const {
236  return totalEvents() - totalEventsPassed();
237  }
238 
241  void enableEndPaths(bool active);
242 
245  bool endPathsEnabled() const;
246 
249  void getTriggerReport(TriggerReport& rep) const;
250 
252  void clearCounters();
253 
255  void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
256 
258  AllWorkers const& allWorkers() const {
259  return workerManager_.allWorkers();
260  }
261 
262  unsigned int numberOfUnscheduledModules() const {
263  return number_of_unscheduled_modules_;
264  }
265 
266  StreamContext const& context() const { return streamContext_;}
267  private:
268  //Sentry class to only send a signal if an
269  // exception occurs. An exception is identified
270  // by the destructor being called without first
271  // calling completedSuccessfully().
273  public:
275  reg_(iReg),
276  context_(iContext){}
278  if(reg_) {
279  reg_->preStreamEarlyTerminationSignal_(*context_,TerminationOrigin::ExceptionFromThisContext);
280  }
281  }
283  reg_ = nullptr;
284  }
285  private:
286  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
288  };
289 
292  return workerManager_.actionTable();
293  }
294 
295 
296  void resetAll();
297 
298  void finishedPaths(std::exception_ptr, WaitingTaskHolder,
299  EventPrincipal& ep, EventSetup const& es);
300  std::exception_ptr finishProcessOneEvent(std::exception_ptr);
301 
302  void reportSkipped(EventPrincipal const& ep) const;
303 
304  void fillWorkers(ParameterSet& proc_pset,
305  ProductRegistry& preg,
306  PreallocationConfiguration const* prealloc,
307  std::shared_ptr<ProcessConfiguration const> processConfiguration,
308  std::string const& name, bool ignoreFilters, PathWorkers& out,
309  std::vector<std::string> const& endPathNames);
310  void fillTrigPath(ParameterSet& proc_pset,
311  ProductRegistry& preg,
312  PreallocationConfiguration const* prealloc,
313  std::shared_ptr<ProcessConfiguration const> processConfiguration,
314  int bitpos, std::string const& name, TrigResPtr,
315  std::vector<std::string> const& endPathNames);
316  void fillEndPath(ParameterSet& proc_pset,
317  ProductRegistry& preg,
318  PreallocationConfiguration const* prealloc,
319  std::shared_ptr<ProcessConfiguration const> processConfiguration,
320  int bitpos, std::string const& name,
321  std::vector<std::string> const& endPathNames);
322 
323  void addToAllWorkers(Worker* w);
324 
325  void resetEarlyDelete();
326  void initializeEarlyDelete(ModuleRegistry & modReg,
327  edm::ParameterSet const& opts,
328  edm::ProductRegistry const& preg,
329  bool allowEarlyDelete);
330 
331  TrigResConstPtr results() const {return get_underlying_safe(results_);}
332  TrigResPtr& results() {return get_underlying_safe(results_);}
333 
334  void makePathStatusInserters(
335  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
336  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
338 
340  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
341 
343 
345  std::vector<edm::propagate_const<WorkerPtr>> pathStatusInserterWorkers_;
346  std::vector<edm::propagate_const<WorkerPtr>> endPathStatusInserterWorkers_;
347 
348  TrigPaths trig_paths_;
349  TrigPaths end_paths_;
350  std::vector<int> empty_trig_paths_;
351  std::vector<int> empty_end_paths_;
352 
353  //For each branch that has been marked for early deletion
354  // keep track of how many modules are left that read this data but have
355  // not yet been run in this event
356  std::vector<BranchToCount> earlyDeleteBranchToCount_;
357  //NOTE the following is effectively internal data for each EarlyDeleteHelper
358  // but putting it into one vector makes for better allocation as well as
359  // faster iteration when used to reset the earlyDeleteBranchToCount_
360  // Each EarlyDeleteHelper hold a begin and end range into this vector. The values
361  // of this vector correspond to indexes into earlyDeleteBranchToCount_ so
362  // tell which EarlyDeleteHelper is associated with which BranchIDs.
363  std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
364  //There is one EarlyDeleteHelper per Module which are reading data that
365  // has been marked for early deletion
366  std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
367 
371 
374  volatile bool endpathsAreActive_;
375  std::atomic<bool> skippingEvent_;
376  };
377 
378  void
379  inline
381  Service<JobReport> reportSvc;
382  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
383  }
384 
385  template <typename T>
387  typename T::MyPrincipal& ep,
388  EventSetup const& es,
389  ServiceToken const& token,
390  bool cleaningUpAfterException) {
391  T::setStreamContext(streamContext_, ep);
392 
393  auto id = ep.id();
394  auto doneTask = make_waiting_task(tbb::task::allocate_root(),
395  [this,iHolder, id,cleaningUpAfterException,token](std::exception_ptr const* iPtr) mutable
396  {
397  std::exception_ptr excpt;
398  if(iPtr) {
399  excpt = *iPtr;
400  //add context information to the exception and print message
401  try {
402  convertException::wrap([&]() {
403  std::rethrow_exception(excpt);
404  });
405  } catch(cms::Exception& ex) {
406  //TODO: should add the transition type info
407  std::ostringstream ost;
408  if(ex.context().empty()) {
409  ost<<"Processing "<<T::transitionName()<<" "<<id;
410  }
411  ServiceRegistry::Operate op(token);
412  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
413  excpt = std::current_exception();
414  }
415 
416  ServiceRegistry::Operate op(token);
417  actReg_->preStreamEarlyTerminationSignal_(streamContext_,TerminationOrigin::ExceptionFromThisContext);
418  }
419 
420  try {
421  ServiceRegistry::Operate op(token);
422  T::postScheduleSignal(actReg_.get(), &streamContext_);
423  } catch(...) {
424  if(not excpt) {
425  excpt = std::current_exception();
426  }
427  }
428  iHolder.doneWaiting(excpt);
429 
430  });
431 
432  auto task = make_functor_task(tbb::task::allocate_root(), [this,doneTask, h =WaitingTaskHolder(doneTask) ,&ep,&es,token] () mutable {
433  ServiceRegistry::Operate op(token);
434  try {
435  T::preScheduleSignal(actReg_.get(), &streamContext_);
436 
437  workerManager_.resetAll();
438  }catch(...) {
439  h.doneWaiting(std::current_exception());
440  return;
441  }
442 
443  for(auto& p : end_paths_) {
444  p.runAllModulesAsync<T>(doneTask, ep, es, token, streamID_, &streamContext_);
445  }
446 
447  for(auto& p : trig_paths_) {
448  p.runAllModulesAsync<T>(doneTask, ep, es, token, streamID_, &streamContext_);
449  }
450 
451  workerManager_.processOneOccurrenceAsync<T>(doneTask,
452  ep, es, token, streamID_, &streamContext_, &streamContext_);
453  });
454 
455  if(streamID_.value() == 0) {
456  //Enqueueing will start another thread if there is only
457  // one thread in the job. Having stream == 0 use spawn
458  // avoids starting up another thread when there is only one stream.
459  tbb::task::spawn( *task);
460  } else {
461  tbb::task::enqueue( *task);
462  }
463  }
464 }
465 
466 #endif
RunNumber_t run() const
Definition: EventID.h:39
EventNumber_t event() const
Definition: EventID.h:41
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
std::vector< Worker * > Workers
std::vector< int > empty_trig_paths_
roAction_t actions[nactions]
Definition: GenABIO.cc:187
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
const double w
Definition: UKUtility.cc:23
int totalEventsFailed() const
unsigned int numberOfUnscheduledModules() const
std::vector< Path > NonTrigPaths
std::shared_ptr< HLTGlobalStatus > TrigResPtr
FunctorTask< F > * make_functor_task(ALLOC &&iAlloc, F f)
Definition: FunctorTask.h:47
void processOneStreamAsync(WaitingTaskHolder iTask, typename T::MyPrincipal &principal, EventSetup const &eventSetup, ServiceToken const &token, bool cleaningUpAfterException=false)
volatile bool endpathsAreActive_
std::vector< BranchToCount > earlyDeleteBranchToCount_
EventID const & id() const
int totalEvents() const
#define noexcept
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
void reportSkipped(EventPrincipal const &ep) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
std::shared_ptr< Worker > WorkerPtr
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
std::vector< WorkerInPath > PathWorkers
int totalEventsPassed() const
std::vector< int > empty_end_paths_
def principal(options)
void doneWaiting(std::exception_ptr iExcept)
StreamID streamID() const
SendTerminationSignalIfException(edm::ActivityRegistry *iReg, edm::StreamContext const *iContext)
StreamContext streamContext_
std::vector< std::string > vstring
std::list< std::string > const & context() const
Definition: Exception.cc:191
std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::vector< Worker * > AllWorkers
rep
Definition: cuy.py:1188
TrigResPtr & results()
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
edm::propagate_const< TrigResPtr > results_
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
std::atomic< bool > skippingEvent_
HLT enums.
double a
Definition: hdecay.h:121
StreamContext const & context() const
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
auto wrap(F iFunc) -> decltype(iFunc())
long double T
TrigResConstPtr results() const
std::vector< Path > TrigPaths
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_