CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StreamSchedule.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_StreamSchedule_h
2 #define FWCore_Framework_StreamSchedule_h
3 
4 /*
5  Author: Jim Kowalkowski 28-01-06
6 
7  A class for creating a schedule based on paths in the configuration file.
8  The schedule is maintained as a sequence of paths.
9  After construction, events can be fed to the object and passed through
10  all the modules in the schedule. All accounting about processing
11  of events by modules and paths is contained here or in object held
12  by containment.
13 
14  The trigger results producer and product are generated and managed here.
15  This class also manages endpaths and calls to endjob and beginjob.
16  Endpaths are just treated as a simple list of modules that need to
17  do processing of the event and do not participate in trigger path
18  activities.
19 
20  This class requires the high-level process pset. It uses @process_name.
21  If the high-level pset contains an "options" pset, then the
22  following optional parameter can be present:
23  bool wantSummary = true/false # default false
24 
25  wantSummary indicates whether or not the pass/fail/error stats
26  for modules and paths should be printed at the end-of-job.
27 
28  A TriggerResults object will always be inserted into the event
29  for any schedule. The producer of the TriggerResults EDProduct
30  is always the first module in the endpath. The TriggerResultInserter
31  is given a fixed label of "TriggerResults".
32 
33  Processing of an event happens by pushing the event through the Paths.
34  The scheduler performs the reset() on each of the workers independent
35  of the Path objects.
36 
37  ------------------------
38 
39  About Paths:
40  Paths fit into two categories:
41  1) trigger paths that contribute directly to saved trigger bits
42  2) end paths
43  The StreamSchedule holds these paths in two data structures:
44  1) main path list
45  2) end path list
46 
47  Trigger path processing always precedes endpath processing.
48  The order of the paths from the input configuration is
49  preserved in the main paths list.
50 
51  ------------------------
52 
53  The StreamSchedule uses the TriggerNamesService to get the names of the
54  trigger paths and end paths. When a TriggerResults object is created
55  the results are stored in the same order as the trigger names from
56  TriggerNamesService.
57 
58 */
59 
85 
86 #include "boost/shared_ptr.hpp"
87 
88 #include <map>
89 #include <memory>
90 #include <set>
91 #include <string>
92 #include <vector>
93 #include <sstream>
94 
95 namespace edm {
96 
97  class ActivityRegistry;
98  class BranchIDListHelper;
99  class EventSetup;
100  class ExceptionCollector;
101  class OutputModuleCommunicator;
102  class ProcessContext;
103  class RunStopwatch;
104  class UnscheduledCallProducer;
105  class WorkerInPath;
106  class TriggerTimingReport;
107  class ModuleRegistry;
108  class TriggerResultInserter;
109  class PreallocationConfiguration;
110 
111  namespace service {
112  class TriggerNamesService;
113  }
114 
115  namespace {
116  template <typename T>
117  class StreamScheduleSignalSentry {
118  public:
119  StreamScheduleSignalSentry(ActivityRegistry* a, typename T::MyPrincipal* principal, EventSetup const* es, typename T::Context const* context) :
120  a_(a), principal_(principal), es_(es), context_(context), allowThrow_(false) {
121  if (a_) T::preScheduleSignal(a_, principal_, context_);
122  }
123  ~StreamScheduleSignalSentry() noexcept(false) {
124  try {
125  if (a_ and principal_) { T::postScheduleSignal(a_, principal_, es_, context_); }
126  } catch(...) {
127  if(allowThrow_) {throw;}
128  }
129  }
130 
131  void allowThrow() {
132  allowThrow_ = true;
133  }
134 
135  private:
136  // We own none of these resources.
137  ActivityRegistry* a_;
138  typename T::MyPrincipal* principal_;
139  EventSetup const* es_;
140  typename T::Context const* context_;
141  bool allowThrow_;
142  };
143  }
144 
146  public:
147  typedef std::vector<std::string> vstring;
148  typedef std::vector<Path> TrigPaths;
149  typedef std::vector<Path> NonTrigPaths;
150  typedef boost::shared_ptr<HLTGlobalStatus> TrigResPtr;
151  typedef boost::shared_ptr<Worker> WorkerPtr;
152  typedef std::vector<Worker*> AllWorkers;
153  typedef std::vector<boost::shared_ptr<OutputModuleCommunicator>> AllOutputModuleCommunicators;
154 
155  typedef std::vector<Worker*> Workers;
156 
157  typedef std::vector<WorkerInPath> PathWorkers;
158 
160  boost::shared_ptr<ModuleRegistry>,
161  ParameterSet& proc_pset,
164  ProductRegistry& pregistry,
165  BranchIDListHelper& branchIDListHelper,
167  boost::shared_ptr<ActivityRegistry> areg,
168  boost::shared_ptr<ProcessConfiguration> processConfiguration,
169  bool allowEarlyDelete,
171  ProcessContext const* processContext);
172 
173  StreamSchedule(StreamSchedule const&) = delete;
174 
175  template <typename T>
176  void processOneEvent(typename T::MyPrincipal& principal,
177  EventSetup const& eventSetup,
178  bool cleaningUpAfterException = false);
179 
180  template <typename T>
181  void processOneStream(typename T::MyPrincipal& principal,
182  EventSetup const& eventSetup,
183  bool cleaningUpAfterException = false);
184 
185  void beginStream();
186  void endStream();
187 
188  StreamID streamID() const { return streamID_; }
189 
190  std::pair<double, double> timeCpuReal() const {
191  return std::pair<double, double>(stopwatch_->cpuTime(), stopwatch_->realTime());
192  }
193 
196 
200  std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
201 
203  void availablePaths(std::vector<std::string>& oLabelsToFill) const;
204 
206  void modulesInPath(std::string const& iPathLabel,
207  std::vector<std::string>& oLabelsToFill) const;
208 
212  int totalEvents() const {
213  return total_events_;
214  }
215 
218  int totalEventsPassed() const {
219  return total_passed_;
220  }
221 
224  int totalEventsFailed() const {
225  return totalEvents() - totalEventsPassed();
226  }
227 
230  void enableEndPaths(bool active);
231 
234  bool endPathsEnabled() const;
235 
238  void getTriggerReport(TriggerReport& rep) const;
239 
243 
245  void clearCounters();
246 
248  void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
249 
251  AllWorkers const& allWorkers() const {
252  return workerManager_.allWorkers();
253  }
254 
255  unsigned int numberOfUnscheduledModules() const {
257  }
258 
259  private:
262  return workerManager_.actionTable();
263  }
264 
265 
266  void resetAll();
267 
268  template <typename T>
269  bool runTriggerPaths(typename T::MyPrincipal&, EventSetup const&, typename T::Context const*);
270 
271  template <typename T>
272  void runEndPaths(typename T::MyPrincipal&, EventSetup const&, typename T::Context const*);
273 
274  void reportSkipped(EventPrincipal const& ep) const;
275 
276  void fillWorkers(ParameterSet& proc_pset,
279  boost::shared_ptr<ProcessConfiguration const> processConfiguration,
280  std::string const& name, bool ignoreFilters, PathWorkers& out,
281  vstring* labelsOnPaths);
282  void fillTrigPath(ParameterSet& proc_pset,
285  boost::shared_ptr<ProcessConfiguration const> processConfiguration,
286  int bitpos, std::string const& name, TrigResPtr,
287  vstring* labelsOnTriggerPaths);
288  void fillEndPath(ParameterSet& proc_pset,
291  boost::shared_ptr<ProcessConfiguration const> processConfiguration,
292  int bitpos, std::string const& name);
293 
294  void addToAllWorkers(Worker* w);
295 
296  void resetEarlyDelete();
298  edm::ParameterSet const& opts,
299  edm::ProductRegistry const& preg,
300  bool allowEarlyDelete);
301 
303  boost::shared_ptr<ActivityRegistry> actReg_;
304 
307 
309 
313  std::vector<int> empty_trig_paths_;
315 
316  //For each branch that has been marked for early deletion
317  // keep track of how many modules are left that read this data but have
318  // not yet been run in this event
319  std::vector<std::pair<BranchID,unsigned int>> earlyDeleteBranchToCount_;
320  //NOTE the following is effectively internal data for each EarlyDeleteHelper
321  // but putting it into one vector makes for better allocation as well as
322  // faster iteration when used to reset the earlyDeleteBranchToCount_
323  // Each EarlyDeleteHelper hold a begin and end range into this vector. The values
324  // of this vector correspond to indexes into earlyDeleteBranchToCount_ so
325  // tell which EarlyDeleteHelper is associated with which BranchIDs.
326  std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
327  //There is one EarlyDeleteHelper per Module which are reading data that
328  // has been marked for early deletion
329  std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
330 
335 
339  volatile bool endpathsAreActive_;
340  };
341 
342  void
343  inline
345  Service<JobReport> reportSvc;
346  reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
347  }
348 
349  template <typename T>
350  void StreamSchedule::processOneEvent(typename T::MyPrincipal& ep,
351  EventSetup const& es,
352  bool cleaningUpAfterException) {
353  this->resetAll();
354  for (int empty_trig_path : empty_trig_paths_) {
355  results_->at(empty_trig_path) = HLTPathStatus(hlt::Pass, 0);
356  }
357 
358  T::setStreamContext(streamContext_, ep);
359  StreamScheduleSignalSentry<T> sentry(actReg_.get(), &ep, &es, &streamContext_);
360 
361  // A RunStopwatch, but only if we are processing an event.
362  RunStopwatch stopwatch(stopwatch_);
363 
364  // This call takes care of the unscheduled processing.
365  workerManager_.processOneOccurrence<T>(ep, es, streamID_, &streamContext_, &streamContext_, cleaningUpAfterException);
366 
367  ++total_events_;
368  try {
369  try {
370  try {
371  if (runTriggerPaths<T>(ep, es, &streamContext_)) {
372  ++total_passed_;
373  }
374  }
375  catch(cms::Exception& e) {
377  assert (action != exception_actions::IgnoreCompletely);
378  assert (action != exception_actions::FailPath);
379  if (action == exception_actions::SkipEvent) {
380  edm::printCmsExceptionWarning("SkipEvent", e);
381  } else {
382  throw;
383  }
384  }
385 
386  try {
387  CPUTimer timer;
388  ParentContext parentContext(&streamContext_);
389  if (results_inserter_.get()) results_inserter_->doWork<T>(ep, es, &timer,streamID_, parentContext, &streamContext_);
390  }
391  catch (cms::Exception & ex) {
392  if (T::isEvent_) {
393  ex.addContext("Calling produce method for module TriggerResultInserter");
394  }
395  std::ostringstream ost;
396  ost << "Processing " << ep.id();
397  ex.addContext(ost.str());
398  throw;
399  }
400 
401  if (endpathsAreActive_) runEndPaths<T>(ep, es, &streamContext_);
403  }
404  catch (cms::Exception& e) { throw; }
405  catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
406  catch (std::exception& e) { convertException::stdToEDM(e); }
408  catch(char const* c) { convertException::charPtrToEDM(c); }
409  catch (...) { convertException::unknownToEDM(); }
410  }
411  catch(cms::Exception& ex) {
412  if (ex.context().empty()) {
413  addContextAndPrintException("Calling function StreamSchedule::processOneEvent", ex, cleaningUpAfterException);
414  } else {
415  addContextAndPrintException("", ex, cleaningUpAfterException);
416  }
417  throw;
418  }
419  //If we got here no other exception has happened so we can propogate any Service related exceptions
420  sentry.allowThrow();
421  }
422 
423  template <typename T>
424  void StreamSchedule::processOneStream(typename T::MyPrincipal& ep,
425  EventSetup const& es,
426  bool cleaningUpAfterException) {
427  this->resetAll();
428 
429  T::setStreamContext(streamContext_, ep);
430  StreamScheduleSignalSentry<T> sentry(actReg_.get(), &ep, &es, &streamContext_);
431 
432  // This call takes care of the unscheduled processing.
433  workerManager_.processOneOccurrence<T>(ep, es, streamID_, &streamContext_, &streamContext_, cleaningUpAfterException);
434 
435  try {
436  try {
437  runTriggerPaths<T>(ep, es, &streamContext_);
438 
439  if (endpathsAreActive_) runEndPaths<T>(ep, es, &streamContext_);
440  }
441  catch (cms::Exception& e) { throw; }
442  catch(std::bad_alloc& bda) { convertException::badAllocToEDM(); }
443  catch (std::exception& e) { convertException::stdToEDM(e); }
445  catch(char const* c) { convertException::charPtrToEDM(c); }
446  catch (...) { convertException::unknownToEDM(); }
447  }
448  catch(cms::Exception& ex) {
449  if (ex.context().empty()) {
450  addContextAndPrintException("Calling function StreamSchedule::processOneStream", ex, cleaningUpAfterException);
451  } else {
452  addContextAndPrintException("", ex, cleaningUpAfterException);
453  }
454  throw;
455  }
456  //If we got here no other exception has happened so we can propogate any Service related exceptions
457  sentry.allowThrow();
458  }
459 
460  template <typename T>
461  bool
462  StreamSchedule::runTriggerPaths(typename T::MyPrincipal& ep, EventSetup const& es, typename T::Context const* context) {
463  for(auto& p : trig_paths_) {
464  p.processOneOccurrence<T>(ep, es, streamID_, context);
465  }
466  return results_->accept();
467  }
468 
469  template <typename T>
470  void
471  StreamSchedule::runEndPaths(typename T::MyPrincipal& ep, EventSetup const& es, typename T::Context const* context) {
472  // Note there is no state-checking safety controlling the
473  // activation/deactivation of endpaths.
474  for(auto& p : end_paths_) {
475  p.processOneOccurrence<T>(ep, es, streamID_, context);
476  }
477  }
478 }
479 
480 #endif
RunNumber_t run() const
Definition: EventID.h:42
EventNumber_t event() const
Definition: EventID.h:44
std::vector< boost::shared_ptr< OutputModuleCommunicator > > AllOutputModuleCommunicators
string rep
Definition: cuy.py:1188
AllWorkers const & allWorkers() const
returns the collection of pointers to workers
void runEndPaths(typename T::MyPrincipal &, EventSetup const &, typename T::Context const *)
std::vector< Worker * > Workers
std::vector< int > empty_trig_paths_
int totalEventsFailed() const
unsigned int numberOfUnscheduledModules() const
std::vector< Path > NonTrigPaths
void initializeEarlyDelete(ModuleRegistry &modReg, edm::ParameterSet const &opts, edm::ProductRegistry const &preg, bool allowEarlyDelete)
std::vector< ModuleDescription const * > getAllModuleDescriptions() const
std::vector< std::pair< BranchID, unsigned int > > earlyDeleteBranchToCount_
void processOneOccurrence(typename T::MyPrincipal &principal, EventSetup const &eventSetup, StreamID streamID, typename T::Context const *topContext, U const *context, bool cleaningUpAfterException=false)
Definition: WorkerManager.h:92
volatile bool endpathsAreActive_
EventID const & id() const
int totalEvents() const
void addContextAndPrintException(char const *context, cms::Exception &ex, bool disablePrint)
processConfiguration
Definition: Schedule.cc:362
std::string const & category() const
Definition: Exception.cc:183
void addToAllWorkers(Worker *w)
exception_actions::ActionCodes find(const std::string &category) const
void reportSkipped(EventPrincipal const &ep) const
bool runTriggerPaths(typename T::MyPrincipal &, EventSetup const &, typename T::Context const *)
ExceptionToActionTable const & actionTable() const
returns the action table
void processOneEvent(typename T::MyPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
actions
Definition: Schedule.cc:362
unsigned int number_of_unscheduled_modules_
WorkerManager workerManager_
std::vector< WorkerInPath > PathWorkers
boost::shared_ptr< ActivityRegistry > actReg_
boost::shared_ptr< Worker > WorkerPtr
int totalEventsPassed() const
void getTriggerReport(TriggerReport &rep) const
StreamID streamID() const
boost::shared_ptr< CPUTimer > StopwatchPointer
Definition: RunStopwatch.h:22
accept
Definition: HLTenums.h:19
void stdToEDM(std::exception const &e)
std::pair< double, double > timeCpuReal() const
void fillWorkers(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, boost::shared_ptr< ProcessConfiguration const > processConfiguration, std::string const &name, bool ignoreFilters, PathWorkers &out, vstring *labelsOnPaths)
StreamContext streamContext_
std::vector< std::string > vstring
std::list< std::string > const & context() const
Definition: Exception.cc:191
void fillEndPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, boost::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name)
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e, edm::JobReport *jobRep=0, int rc=-1)
std::vector< Worker * > AllWorkers
vstring empty_trig_path_names_
areg
Definition: Schedule.cc:362
tuple out
Definition: dbtoconf.py:99
void clearCounters()
Clear all the counters in the trigger report.
#define noexcept
AllWorkers const & allWorkers() const
Definition: WorkerManager.h:64
void charPtrToEDM(char const *c)
WorkerPtr results_inserter_
void stringToEDM(std::string &s)
std::vector< unsigned int > earlyDeleteHelperToBranchIndicies_
RunStopwatch::StopwatchPointer stopwatch_
boost::shared_ptr< HLTGlobalStatus > TrigResPtr
void addContext(std::string const &context)
Definition: Exception.cc:227
StreamSchedule(TriggerResultInserter *inserter, boost::shared_ptr< ModuleRegistry >, ParameterSet &proc_pset, service::TriggerNamesService &tns, PreallocationConfiguration const &prealloc, ProductRegistry &pregistry, BranchIDListHelper &branchIDListHelper, ExceptionToActionTable const &actions, boost::shared_ptr< ActivityRegistry > areg, boost::shared_ptr< ProcessConfiguration > processConfiguration, bool allowEarlyDelete, StreamID streamID, ProcessContext const *processContext)
void getTriggerTimingReport(TriggerTimingReport &rep) const
bool endPathsEnabled() const
double a
Definition: hdecay.h:121
void replaceModule(maker::ModuleHolder *iMod, std::string const &iLabel)
clone the type of module with label iLabel but configure with iPSet.
void enableEndPaths(bool active)
T w() const
volatile std::atomic< bool > shutdown_flag false
preg
Definition: Schedule.cc:362
long double T
void fillTrigPath(ParameterSet &proc_pset, ProductRegistry &preg, PreallocationConfiguration const *prealloc, boost::shared_ptr< ProcessConfiguration const > processConfiguration, int bitpos, std::string const &name, TrigResPtr, vstring *labelsOnTriggerPaths)
std::vector< Path > TrigPaths
std::vector< std::string > vstring
Definition: Schedule.cc:347
void processOneStream(typename T::MyPrincipal &principal, EventSetup const &eventSetup, bool cleaningUpAfterException=false)
ExceptionToActionTable const & actionTable() const
Definition: WorkerManager.h:68
void availablePaths(std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill the labels for all paths in the process
std::vector< EarlyDeleteHelper > earlyDeleteHelpers_
void modulesInPath(std::string const &iPathLabel, std::vector< std::string > &oLabelsToFill) const
adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel ...
prealloc
Definition: Schedule.cc:362