CMS 3D CMS Logo

Path.h
Go to the documentation of this file.
1 #ifndef FWCore_Framework_Path_h
2 #define FWCore_Framework_Path_h
3 
4 /*
5  Author: Jim Kowalkowski 28-01-06
6 
7  An object of this type represents one path in a job configuration.
8  It holds the assigned bit position and the list of workers that are
9  an event must pass through when this parh is processed. The workers
10  are held in WorkerInPath wrappers so that per path execution statistics
11  can be kept for each worker.
12 */
13 
23 
24 #include <memory>
25 
26 #include <string>
27 #include <vector>
28 #include <map>
29 #include <exception>
30 #include <sstream>
31 
32 namespace edm {
33  class EventPrincipal;
34  class ModuleDescription;
35  class RunPrincipal;
36  class LuminosityBlockPrincipal;
37  class EarlyDeleteHelper;
38  class StreamContext;
39  class StreamID;
40  class WaitingTask;
41 
42  class Path {
43  public:
45 
46  typedef std::vector<WorkerInPath> WorkersInPath;
48  typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
49 
50  Path(int bitpos, std::string const& path_name,
51  WorkersInPath const& workers,
52  TrigResPtr trptr,
54  std::shared_ptr<ActivityRegistry> reg,
55  StreamContext const* streamContext,
56  std::atomic<bool>* stopProcessEvent,
57  PathContext::PathType pathType);
58 
59  Path(Path const&);
60 
61  template <typename T>
62  void processOneOccurrence(typename T::MyPrincipal const&, EventSetup const&,
63  StreamID const&, typename T::Context const*);
64 
65  template <typename T>
67  typename T::MyPrincipal const&,
68  EventSetup const&,
69  StreamID const&,
70  typename T::Context const*);
71 
73 
74  int bitPosition() const { return bitpos_; }
75  std::string const& name() const { return pathContext_.pathName(); }
76 
77  void clearCounters();
78 
79  int timesRun() const { return timesRun_; }
80  int timesPassed() const { return timesPassed_; }
81  int timesFailed() const { return timesFailed_; }
82  int timesExcept() const { return timesExcept_; }
83  //int abortWorker() const { return abortWorker_; }
84  State state() const { return state_; }
85 
86  size_type size() const { return workers_.size(); }
87  int timesVisited(size_type i) const { return workers_.at(i).timesVisited(); }
88  int timesPassed (size_type i) const { return workers_.at(i).timesPassed() ; }
89  int timesFailed (size_type i) const { return workers_.at(i).timesFailed() ; }
90  int timesExcept (size_type i) const { return workers_.at(i).timesExcept() ; }
91  Worker const* getWorker(size_type i) const { return workers_.at(i).getWorker(); }
92 
93  void setEarlyDeleteHelpers(std::map<const Worker*,EarlyDeleteHelper*> const&);
94 
95  private:
96 
97  // If you define this be careful about the pointer in the
98  // PlaceInPathContext object in the contained WorkerInPath objects.
99  Path const& operator=(Path const&) = delete; // stop default
100 
105  //int abortWorker_;
106  State state_;
107 
108  int bitpos_;
109  TrigResPtr trptr_;
110  std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
112 
113  WorkersInPath workers_;
114  std::vector<EarlyDeleteHelper*> earlyDeleteHelpers_;
115 
118  std::atomic<bool>* stopProcessingEvent_;
119 
120 
121 
122  // Helper functions
123  // nwrwue = numWorkersRunWithoutUnhandledException (really!)
125  int nwrwue,
126  bool isEvent,
127  bool begin,
129  ModuleDescription const&,
130  std::string const& id);
131  static void exceptionContext(cms::Exception & ex,
132  bool isEvent,
133  bool begin,
135  ModuleDescription const&,
136  std::string const& id,
137  PathContext const&);
138  void recordStatus(int nwrwue, bool isEvent);
139  void updateCounters(bool succeed, bool isEvent);
140 
141  void finished(int iModuleIndex, bool iSucceeded, std::exception_ptr,
142  StreamContext const*);
143 
144  void handleEarlyFinish(EventPrincipal const&);
147 
148  //Handle asynchronous processing
149  void workerFinished(std::exception_ptr const* iException,
150  unsigned int iModuleIndex,
151  EventPrincipal const& iEP, EventSetup const& iES,
152  StreamID const& iID, StreamContext const* iContext);
153  void runNextWorkerAsync(unsigned int iNextModuleIndex,
154  EventPrincipal const&, EventSetup const&,
155  StreamID const&, StreamContext const*);
156 
157  };
158 
159  namespace {
160  template <typename T>
161  class PathSignalSentry {
162  public:
163  PathSignalSentry(ActivityRegistry *a,
164  int const& nwrwue,
165  hlt::HLTState const& state,
166  PathContext const* pathContext) :
167  a_(a), nwrwue_(nwrwue), state_(state), pathContext_(pathContext) {
168  if (a_) T::prePathSignal(a_, pathContext_);
169  }
170  ~PathSignalSentry() {
171  HLTPathStatus status(state_, nwrwue_);
172  if(a_) T::postPathSignal(a_, status, pathContext_);
173  }
174  private:
175  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
176  int const& nwrwue_;
177  hlt::HLTState const& state_;
178  PathContext const* pathContext_;
179  };
180  }
181 
182  template <typename T>
184  typename T::MyPrincipal const& p,
185  EventSetup const& es,
186  StreamID const& streamID,
187  typename T::Context const* context) {
188  for(auto& worker: workers_) {
189  worker.runWorkerAsync<T>(task,p,es,streamID,context);
190  }
191  }
192 
193  template <typename T>
194  void Path::processOneOccurrence(typename T::MyPrincipal const& ep, EventSetup const& es,
195  StreamID const& streamID, typename T::Context const* context) {
196 
197  int nwrwue = -1;
198  PathSignalSentry<T> signaler(actReg_.get(), nwrwue, state_, &pathContext_);
199 
200  if (T::isEvent_) {
201  ++timesRun_;
202  }
203  state_ = hlt::Ready;
204 
205  // nwrue = numWorkersRunWithoutUnhandledException
206  bool should_continue = true;
207  WorkersInPath::iterator i = workers_.begin(), end = workers_.end();
208 
209  auto earlyFinishSentry = make_sentry(this,[&i,end, &ep](Path*){
210  for(auto j=i; j!= end;++j) {
211  j->skipWorker(ep);
212  }
213  });
214  for (;
215  i != end && should_continue;
216  ++i) {
217  ++nwrwue;
218  try {
219  convertException::wrap([&]() {
220  should_continue = i->runWorker<T>(ep, es, streamID, context);
221  });
222  }
223  catch(cms::Exception& ex) {
224  // handleWorkerFailure may throw a new exception.
225  std::ostringstream ost;
226  ost << ep.id();
227  should_continue = handleWorkerFailure(ex, nwrwue, T::isEvent_, T::begin_, T::branchType_,
228  i->getWorker()->description(), ost.str());
229  //If we didn't rethrow, then we effectively skipped
230  i->skipWorker(ep);
231  }
232  }
233  if (not should_continue) {
234  handleEarlyFinish(ep);
235  }
236  updateCounters(should_continue, T::isEvent_);
237  recordStatus(nwrwue, T::isEvent_);
238  }
239 
240 }
241 
242 #endif
std::string const & pathName() const
Definition: PathContext.h:37
int timesRun() const
Definition: Path.h:79
std::unique_ptr< T, F > make_sentry(T *iObject, F iFunc)
NOTE: if iObject is null, then iFunc will not be called.
Definition: make_sentry.h:29
int bitPosition() const
Definition: Path.h:74
void recordStatus(int nwrwue, bool isEvent)
Definition: Path.cc:146
void handleEarlyFinish(EventPrincipal const &)
Definition: Path.cc:188
not [yet] run
Definition: HLTenums.h:18
roAction_t actions[nactions]
Definition: GenABIO.cc:187
int timesFailed(size_type i) const
Definition: Path.h:89
std::vector< EarlyDeleteHelper * > earlyDeleteHelpers_
Definition: Path.h:114
int timesFailed_
Definition: Path.h:103
std::vector< WorkerInPath > WorkersInPath
Definition: Path.h:46
hlt::HLTState State
Definition: Path.h:44
WorkersInPath::size_type size_type
Definition: Path.h:47
HLTState
status of a trigger path
Definition: HLTenums.h:18
int timesPassed(size_type i) const
Definition: Path.h:88
int timesExcept() const
Definition: Path.h:82
int timesExcept_
Definition: Path.h:104
void runNextWorkerAsync(unsigned int iNextModuleIndex, EventPrincipal const &, EventSetup const &, StreamID const &, StreamContext const *)
Definition: Path.cc:286
PathContext pathContext_
Definition: Path.h:116
size_type size() const
Definition: Path.h:86
WaitingTaskList waitingTasks_
Definition: Path.h:117
State state() const
Definition: Path.h:84
uint16_t size_type
BranchType
Definition: BranchType.h:11
std::shared_ptr< ActivityRegistry > actReg_
Definition: Path.h:110
TrigResPtr trptr_
Definition: Path.h:109
void workerFinished(std::exception_ptr const *iException, unsigned int iModuleIndex, EventPrincipal const &iEP, EventSetup const &iES, StreamID const &iID, StreamContext const *iContext)
Definition: Path.cc:217
std::shared_ptr< HLTGlobalStatus > TrigResPtr
Definition: Path.h:48
void processOneOccurrenceAsync(WaitingTask *, EventPrincipal const &, EventSetup const &, StreamID const &, StreamContext const *)
Definition: Path.cc:195
int bitpos_
Definition: Path.h:108
Definition: Path.h:42
int timesPassed() const
Definition: Path.h:80
#define end
Definition: vmac.h:37
void handleEarlyFinish(LuminosityBlockPrincipal const &)
Definition: Path.h:146
void handleEarlyFinish(RunPrincipal const &)
Definition: Path.h:145
WorkersInPath workers_
Definition: Path.h:113
static void exceptionContext(cms::Exception &ex, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id, PathContext const &)
Definition: Path.cc:111
void updateCounters(bool succeed, bool isEvent)
Definition: Path.cc:153
Path(int bitpos, std::string const &path_name, WorkersInPath const &workers, TrigResPtr trptr, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > reg, StreamContext const *streamContext, std::atomic< bool > *stopProcessEvent, PathContext::PathType pathType)
Definition: Path.cc:12
int timesVisited(size_type i) const
Definition: Path.h:87
bool handleWorkerFailure(cms::Exception &e, int nwrwue, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id)
Definition: Path.cc:60
void setEarlyDeleteHelpers(std::map< const Worker *, EarlyDeleteHelper * > const &)
Definition: Path.cc:171
void runAllModulesAsync(WaitingTask *, typename T::MyPrincipal const &, EventSetup const &, StreamID const &, typename T::Context const *)
Definition: Path.h:183
ExceptionToActionTable const * act_table_
Definition: Path.h:111
Path const & operator=(Path const &)=delete
State state_
Definition: Path.h:106
std::string const & name() const
Definition: Path.h:75
#define begin
Definition: vmac.h:30
HLT enums.
double a
Definition: hdecay.h:121
Worker const * getWorker(size_type i) const
Definition: Path.h:91
void finished(int iModuleIndex, bool iSucceeded, std::exception_ptr, StreamContext const *)
Definition: Path.cc:268
auto wrap(F iFunc) -> decltype(iFunc())
int timesExcept(size_type i) const
Definition: Path.h:90
long double T
int timesPassed_
Definition: Path.h:102
std::atomic< bool > * stopProcessingEvent_
Definition: Path.h:118
int timesRun_
Definition: Path.h:101
def branchType(schema, name)
Definition: revisionDML.py:112
void clearCounters()
Definition: Path.cc:164
int timesFailed() const
Definition: Path.h:81
void processOneOccurrence(typename T::MyPrincipal const &, EventSetup const &, StreamID const &, typename T::Context const *)
Definition: Path.h:194