CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
StreamSchedule.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_StreamSchedule_h
2 #define FWCore_Framework_StreamSchedule_h
3 
4 /*
5  Author: Jim Kowalkowski 28-01-06
6 
7  A class for creating a schedule based on paths in the configuration file.
8  The schedule is maintained as a sequence of paths.
9  After construction, events can be fed to the object and passed through
10  all the modules in the schedule. All accounting about processing
11  of events by modules and paths is contained here or in object held
12  by containment.
13 
14  The trigger results producer and product are generated and managed here.
15  This class also manages endpaths and calls to endjob and beginjob.
16  Endpaths are just treated as a simple list of modules that need to
17  do processing of the event and do not participate in trigger path
18  activities.
19 
20  This class requires the high-level process pset. It uses @process_name.
21  If the high-level pset contains an "options" pset, then the
22  following optional parameter can be present:
23  bool wantSummary = true/false # default false
24 
25  wantSummary indicates whether or not the pass/fail/error stats
26  for modules and paths should be printed at the end-of-job.
27 
28  A TriggerResults object will always be inserted into the event
29  for any schedule. The producer of the TriggerResults EDProduct
30  is always the first module in the endpath. The TriggerResultInserter
31  is given a fixed label of "TriggerResults".
32 
33  Processing of an event happens by pushing the event through the Paths.
34  The scheduler performs the reset() on each of the workers independent
35  of the Path objects.
36 
37  ------------------------
38 
39  About Paths:
40  Paths fit into two categories:
41  1) trigger paths that contribute directly to saved trigger bits
42  2) end paths
43  The StreamSchedule holds these paths in two data structures:
44  1) main path list
45  2) end path list
46 
47  Trigger path processing always precedes endpath processing.
48  The order of the paths from the input configuration is
49  preserved in the main paths list.
50 
51  ------------------------
52 
53  The StreamSchedule uses the TriggerNamesService to get the names of the
54  trigger paths and end paths. When a TriggerResults object is created
55  the results are stored in the same order as the trigger names from
56  TriggerNamesService.
57 
58 */
59 
90 
91 #include <map>
92 #include <memory>
93 #include <set>
94 #include <string>
95 #include <vector>
96 #include <sstream>
97 #include <atomic>
98 
99 namespace edm {
100 
101  class ActivityRegistry;
102  class BranchIDListHelper;
103  class ExceptionCollector;
104  class ExceptionToActionTable;
105  class OutputModuleCommunicator;
106  class ProcessContext;
108  class WorkerInPath;
109  class ModuleRegistry;
110  class TriggerResultInserter;
111  class PathStatusInserter;
112  class EndPathStatusInserter;
113  class PreallocationConfiguration;
114  class WaitingTaskHolder;
115 
116  namespace service {
117  class TriggerNamesService;
118  }
119 
120  namespace {
121  template <typename T>
122  class StreamScheduleSignalSentry {
123  public:
124  StreamScheduleSignalSentry(ActivityRegistry* a, typename T::Context const* context)
125  : a_(a), context_(context), allowThrow_(false) {
126  if (a_)
127  T::preScheduleSignal(a_, context_);
128  }
129  ~StreamScheduleSignalSentry() noexcept(false) {
130  // Caught exception is rethrown (when allowed)
131  CMS_SA_ALLOW try {
132  if (a_) {
133  T::postScheduleSignal(a_, context_);
134  }
135  } catch (...) {
136  if (allowThrow_) {
137  throw;
138  }
139  }
140  }
141 
142  void allowThrow() { allowThrow_ = true; }
143 
144  private:
145  // We own none of these resources.
146  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
147  typename T::Context const* context_;
148  bool allowThrow_;
149  };
150  } // namespace
151 
153  public:
154  typedef std::vector<std::string> vstring;
155  typedef std::vector<Path> TrigPaths;
156  typedef std::vector<Path> NonTrigPaths;
157  typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
158  typedef std::shared_ptr<HLTGlobalStatus const> TrigResConstPtr;
159  typedef std::shared_ptr<Worker> WorkerPtr;
160  typedef std::vector<Worker*> AllWorkers;
161 
162  typedef std::vector<Worker*> Workers;
163 
164  typedef std::vector<WorkerInPath> PathWorkers;
165 
166  StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
167  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
168  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
169  std::shared_ptr<ModuleRegistry>,
170  ParameterSet& proc_pset,
173  ProductRegistry& pregistry,
174  BranchIDListHelper& branchIDListHelper,
176  std::shared_ptr<ActivityRegistry> areg,
177  std::shared_ptr<ProcessConfiguration> processConfiguration,
178  bool allowEarlyDelete,
180  ProcessContext const* processContext);
181 
182  StreamSchedule(StreamSchedule const&) = delete;
183 
185  WaitingTaskHolder iTask,
187  ServiceToken const& token,
188  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters);
189 
190  template <typename T>
192  typename T::TransitionInfoType& transitionInfo,
193  ServiceToken const& token,
194  bool cleaningUpAfterException = false);
195 
196  void beginStream();
197  void endStream();
198 
199  StreamID streamID() const { return streamID_; }
200 
203 
207  std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
208 
210  void availablePaths(std::vector<std::string>& oLabelsToFill) const;
211 
213  void modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const;
214 
215  void moduleDescriptionsInPath(std::string const& iPathLabel,
216  std::vector<ModuleDescription const*>& descriptions,
217  unsigned int hint) const;
218 
219  void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
220  std::vector<ModuleDescription const*>& descriptions,
221  unsigned int hint) const;
222 
226  int totalEvents() const { return total_events_; }
227 
230  int totalEventsPassed() const { return total_passed_; }
231 
234  int totalEventsFailed() const { return totalEvents() - totalEventsPassed(); }
235 
238  void getTriggerReport(TriggerReport& rep) const;
239 
241  void clearCounters();
242 
244  void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
245 
247  void deleteModule(std::string const& iLabel);
248 
250  AllWorkers const& allWorkers() const { return workerManager_.allWorkers(); }
251 
253 
254  StreamContext const& context() const { return streamContext_; }
255 
256  private:
257  //Sentry class to only send a signal if an
258  // exception occurs. An exception is identified
259  // by the destructor being called without first
260  // calling completedSuccessfully().
262  public:
264  : reg_(iReg), context_(iContext) {}
266  if (reg_) {
268  }
269  }
270  void completedSuccessfully() { reg_ = nullptr; }
271 
272  private:
273  edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
275  };
276 
279 
280  void resetAll();
281 
282  void finishedPaths(std::atomic<std::exception_ptr*>&, WaitingTaskHolder, EventTransitionInfo&);
283 
284  std::exception_ptr finishProcessOneEvent(std::exception_ptr);
285 
286  void reportSkipped(EventPrincipal const& ep) const;
287 
288  void fillWorkers(ParameterSet& proc_pset,
291  std::shared_ptr<ProcessConfiguration const> processConfiguration,
292  std::string const& name,
293  bool ignoreFilters,
294  PathWorkers& out,
295  std::vector<std::string> const& endPathNames);
296  void fillTrigPath(ParameterSet& proc_pset,
299  std::shared_ptr<ProcessConfiguration const> processConfiguration,
300  int bitpos,
301  std::string const& name,
302  TrigResPtr,
303  std::vector<std::string> const& endPathNames);
304  void fillEndPath(ParameterSet& proc_pset,
307  std::shared_ptr<ProcessConfiguration const> processConfiguration,
308  int bitpos,
309  std::string const& name,
310  std::vector<std::string> const& endPathNames);
311 
312  void addToAllWorkers(Worker* w);
313 
314  void resetEarlyDelete();
316  edm::ParameterSet const& opts,
317  edm::ProductRegistry const& preg,
318  bool allowEarlyDelete);
319 
322 
324  std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
325  std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
327 
329  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
330 
332 
334  std::vector<edm::propagate_const<WorkerPtr>> pathStatusInserterWorkers_;
335  std::vector<edm::propagate_const<WorkerPtr>> endPathStatusInserterWorkers_;
336 
339  std::vector<int> empty_trig_paths_;
340  std::vector<int> empty_end_paths_;
341 
342  //For each branch that has been marked for early deletion
343  // keep track of how many modules are left that read this data but have
344  // not yet been run in this event
345  std::vector<BranchToCount> earlyDeleteBranchToCount_;
346  //NOTE the following is effectively internal data for each EarlyDeleteHelper
347  // but putting it into one vector makes for better allocation as well as
348  // faster iteration when used to reset the earlyDeleteBranchToCount_
349  // Each EarlyDeleteHelper hold a begin and end range into this vector. The values
350  // of this vector correspond to indexes into earlyDeleteBranchToCount_ so
351  // tell which EarlyDeleteHelper is associated with which BranchIDs.
352  std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
353  //There is one EarlyDeleteHelper per Module which are reading data that
354  // has been marked for early deletion
355  std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
356 
360 
363  std::atomic<bool> skippingEvent_;
364  };
365 
366  void inline StreamSchedule::reportSkipped(EventPrincipal const& ep) const {
367  Service<JobReport> reportSvc;
368  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
369  }
370 
371  template <typename T>
373  typename T::TransitionInfoType& transitionInfo,
374  ServiceToken const& token,
375  bool cleaningUpAfterException) {
376  auto const& principal = transitionInfo.principal();
377  T::setStreamContext(streamContext_, principal);
378 
379  auto id = principal.id();
380  ServiceWeakToken weakToken = token;
381  auto doneTask = make_waiting_task(
382  [this, iHolder, id, cleaningUpAfterException, weakToken](std::exception_ptr const* iPtr) mutable {
383  std::exception_ptr excpt;
384  if (iPtr) {
385  excpt = *iPtr;
386  //add context information to the exception and print message
387  try {
388  convertException::wrap([&]() { std::rethrow_exception(excpt); });
389  } catch (cms::Exception& ex) {
390  //TODO: should add the transition type info
391  std::ostringstream ost;
392  if (ex.context().empty()) {
393  ost << "Processing " << T::transitionName() << " " << id;
394  }
395  ServiceRegistry::Operate op(weakToken.lock());
396  addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
397  excpt = std::current_exception();
398  }
399 
400  ServiceRegistry::Operate op(weakToken.lock());
401  actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
402  }
403  // Caught exception is propagated via WaitingTaskHolder
404  CMS_SA_ALLOW try {
405  ServiceRegistry::Operate op(weakToken.lock());
406  T::postScheduleSignal(actReg_.get(), &streamContext_);
407  } catch (...) {
408  if (not excpt) {
409  excpt = std::current_exception();
410  }
411  }
412  iHolder.doneWaiting(excpt);
413  });
414 
415  auto task = make_functor_task(
416  [this, h = WaitingTaskHolder(*iHolder.group(), doneTask), info = transitionInfo, weakToken]() mutable {
417  auto token = weakToken.lock();
418  ServiceRegistry::Operate op(token);
419  // Caught exception is propagated via WaitingTaskHolder
420  CMS_SA_ALLOW try {
421  T::preScheduleSignal(actReg_.get(), &streamContext_);
422 
424  } catch (...) {
425  h.doneWaiting(std::current_exception());
426  return;
427  }
428 
429  for (auto& p : end_paths_) {
430  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
431  }
432 
433  for (auto& p : trig_paths_) {
434  p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
435  }
436 
438  });
439 
440  if (streamID_.value() == 0) {
441  //Enqueueing will start another thread if there is only
442  // one thread in the job. Having stream == 0 use spawn
443  // avoids starting up another thread when there is only one stream.
444  iHolder.group()->run([task]() {
445  TaskSentry s{task};
446  task->execute();
447  });
448  } else {
449  tbb::task_arena arena{tbb::task_arena::attach()};
450  arena.enqueue([task]() {
451  TaskSentry s{task};
452  task->execute();
453  });
454  }
455  }
456 } // namespace edm
457 
458 #endif
RunNumber_t run() const
Definition: EventID.h:38
void moduleDescriptionsInEndPath(std::string const &iEndPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
EventNumber_t event() const
Definition: EventID.h:40
ServiceToken lock() const
Definition: ServiceToken.h:101
string rep
Definition: cuy.py:1189
pathNames_ & tns()), endPathNames_(&tns.getEndPaths()), wantSummary_(tns.wantSummary()
Definition: Schedule.cc:691
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
std::shared_ptr< HLTGlobalStatus const > TrigResConstPtr
std::vector< Worker * > Workers
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::vector< int > empty_trig_paths_
void moduleDescriptionsInPath(std::string const &iPathLabel, std::vector< ModuleDescription const * > &descriptions, unsigned int hint) const
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, std::vector< std::string > const &endPathNames)
std::vector< edm::propagate_const< WorkerPtr > > pathStatusInserterWorkers_
const double w
Definition: UKUtility.cc:23
uint16_t *__restrict__ id
int totalEventsFailed() const
unsigned int numberOfUnscheduledModules() const
std::vector< Path > NonTrigPaths
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
std::shared_ptr< HLTGlobalStatus > TrigResPtr
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
void processOneEventAsync(WaitingTaskHolder iTask, EventTransitionInfo &, ServiceToken const &token, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters)
std::vector< BranchToCount > earlyDeleteBranchToCount_
EventID const & id() const
int totalEvents() const
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
processConfiguration
Definition: Schedule.cc:687
void addToAllWorkers(Worker *w)
void reportSkipped(EventPrincipal const &ep) const
edm::propagate_const< WorkerPtr > results_inserter_
ExceptionToActionTable const & actionTable() const
returns the action table
constexpr std::shared_ptr< T > & get_underlying_safe(propagate_const< std::shared_ptr< T >> &iP)
std::shared_ptr< Worker > WorkerPtr
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, std::vector< std::string > const &endPathNames)
actions
Definition: Schedule.cc:687
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::shared_ptr< ActivityRegistry > actReg_
void finishedPaths(std::atomic< std::exception_ptr * > &, WaitingTaskHolder, EventTransitionInfo &)
void processOneOccurrenceAsync(WaitingTaskHolder, typename T::TransitionInfoType &, ServiceToken const &, StreamID, typename T::Context const *topContext, U const *context)
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, std::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, std::vector< std::string > const &endPathNames)
std::vector< WorkerInPath > PathWorkers
int totalEventsPassed() const
std::vector< int > empty_end_paths_
void getTriggerReport(TriggerReport &rep) const
void doneWaiting(std::exception_ptr iExcept)
StreamID streamID() const
PreStreamEarlyTermination preStreamEarlyTerminationSignal_
SendTerminationSignalIfException(edm::ActivityRegistry *iReg, edm::StreamContext const *iContext)
StreamContext streamContext_
std::vector< std::string > vstring
std::list< std::string > const & context() const
Definition: Exception.cc:147
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
std::vector< Worker * > AllWorkers
std::exception_ptr finishProcessOneEvent(std::exception_ptr)
void deleteModule(std::string const &iLabel)
Delete the module with label iLabel.
areg
Definition: Schedule.cc:687
void clearCounters()
Clear all the counters in the trigger report.
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:85
TrigResPtr & results()
unsigned int value() const
Definition: StreamID.h:43
void processOneStreamAsync(WaitingTaskHolder iTask, typename T::TransitionInfoType &transitionInfo, ServiceToken const &token, bool cleaningUpAfterException=false)
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
edm::propagate_const< TrigResPtr > results_
FunctorTask< F > * make_functor_task(F f)
Definition: FunctorTask.h:44
tbb::task_group * group() const noexcept
StreamSchedule(std::shared_ptr< TriggerResultInserter > inserter, std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, std::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService const &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > areg, std::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
std::atomic< bool > skippingEvent_
double a
Definition: hdecay.h:119
StreamContext const & context() const
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
std::vector< edm::propagate_const< WorkerPtr > > endPathStatusInserterWorkers_
void makePathStatusInserters(std::vector< edm::propagate_const< std::shared_ptr< PathStatusInserter >>> &pathStatusInserters, std::vector< edm::propagate_const< std::shared_ptr< EndPathStatusInserter >>> &endPathStatusInserters, ExceptionToActionTable const &actions)
auto wrap(F iFunc) -> decltype(iFunc())
preg
Definition: Schedule.cc:687
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
long double T
TrigResConstPtr results() const
std::vector< Path > TrigPaths
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:89
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
prealloc
Definition: Schedule.cc:687