CMS 3D CMS Logo

Path.cc
Go to the documentation of this file.
1 
11 
12 #include <algorithm>
13 
14 namespace edm {
15  Path::Path(int bitpos,
16  std::string const& path_name,
17  WorkersInPath const& workers,
18  TrigResPtr trptr,
20  std::shared_ptr<ActivityRegistry> areg,
21  StreamContext const* streamContext,
22  std::atomic<bool>* stopProcessingEvent,
23  PathContext::PathType pathType)
24  : timesRun_(),
25  timesPassed_(),
26  timesFailed_(),
27  timesExcept_(),
28  state_(hlt::Ready),
29  bitpos_(bitpos),
30  trptr_(trptr),
31  actReg_(areg),
32  act_table_(&actions),
33  workers_(workers),
34  pathContext_(path_name, streamContext, bitpos, pathType),
35  stopProcessingEvent_(stopProcessingEvent),
36  pathStatusInserter_(nullptr),
37  pathStatusInserterWorker_(nullptr) {
38  for (auto& workerInPath : workers_) {
39  workerInPath.setPathContext(&pathContext_);
40  }
41  }
42 
43  Path::Path(Path const& r)
44  : timesRun_(r.timesRun_),
48  state_(r.state_),
49  bitpos_(r.bitpos_),
50  trptr_(r.trptr_),
51  actReg_(r.actReg_),
53  workers_(r.workers_),
59  for (auto& workerInPath : workers_) {
60  workerInPath.setPathContext(&pathContext_);
61  }
62  }
63 
65  int nwrwue,
66  bool isEvent,
67  bool begin,
68  BranchType branchType,
69  ModuleDescription const& desc,
70  std::string const& id) {
71  if (e.context().empty()) {
72  exceptionContext(e, isEvent, begin, branchType, desc, id, pathContext_);
73  }
74  bool should_continue = true;
75 
76  // there is no support as of yet for specific paths having
77  // different exception behavior
78 
79  // If not processing an event, always rethrow.
81  switch (action) {
83  should_continue = false;
84  edm::printCmsExceptionWarning("FailPath", e);
85  break;
86  }
88  //Need the other Paths to stop as soon as possible
90  *stopProcessingEvent_ = true;
91  }
92  break;
93  }
94  default: {
95  if (isEvent)
96  ++timesExcept_;
98  recordStatus(nwrwue, isEvent);
99  if (action == exception_actions::Rethrow) {
101  if (e.category() == pNF) {
102  std::ostringstream ost;
103  ost << "If you wish to continue processing events after a " << pNF << " exception,\n"
104  << "add \"SkipEvent = cms.untracked.vstring('ProductNotFound')\" to the \"options\" PSet in the "
105  "configuration.\n";
106  e.addAdditionalInfo(ost.str());
107  }
108  }
109  //throw will copy which will slice the object
110  e.raise();
111  }
112  }
113 
114  return should_continue;
115  }
116 
118  bool isEvent,
119  bool begin,
120  BranchType branchType,
121  ModuleDescription const& desc,
122  std::string const& id,
123  PathContext const& pathContext) {
124  std::ostringstream ost;
125  ost << "Running path '" << pathContext.pathName() << "'";
126  ex.addContext(ost.str());
127  ost.str("");
128  ost << "Processing ";
129  //For the event case, the Worker has already
130  // added the necessary module context to the exception
131  if (begin && branchType == InRun) {
132  ost << "stream begin Run";
133  } else if (begin && branchType == InLumi) {
134  ost << "stream begin LuminosityBlock ";
135  } else if (!begin && branchType == InLumi) {
136  ost << "stream end LuminosityBlock ";
137  } else if (!begin && branchType == InRun) {
138  ost << "stream end Run ";
139  } else if (isEvent) {
140  // It should be impossible to get here ...
141  ost << "Event ";
142  }
143  ost << id;
144  ex.addContext(ost.str());
145  }
146 
147  void Path::recordStatus(int nwrwue, bool isEvent) {
148  if (isEvent && trptr_) {
149  (*trptr_)[bitpos_] = HLTPathStatus(state_, nwrwue);
150  }
151  }
152 
153  void Path::updateCounters(bool success, bool isEvent) {
154  if (success) {
155  if (isEvent)
156  ++timesPassed_;
157  state_ = hlt::Pass;
158  } else {
159  if (isEvent)
160  ++timesFailed_;
161  state_ = hlt::Fail;
162  }
163  }
164 
166  using std::placeholders::_1;
168  for_all(workers_, std::bind(&WorkerInPath::clearCounters, _1));
169  }
170 
171  void Path::setEarlyDeleteHelpers(std::map<const Worker*, EarlyDeleteHelper*> const& iWorkerToDeleter) {
172  //we use a temp so we can overset the size but then when moving to earlyDeleteHelpers we only
173  // have to use the space necessary
174  std::vector<EarlyDeleteHelper*> temp;
175  temp.reserve(iWorkerToDeleter.size());
176  for (unsigned int index = 0; index != size(); ++index) {
177  auto found = iWorkerToDeleter.find(getWorker(index));
178  if (found != iWorkerToDeleter.end()) {
179  temp.push_back(found->second);
180  found->second->addedToPath();
181  }
182  }
183  std::vector<EarlyDeleteHelper*> tempCorrectSize(temp.begin(), temp.end());
184  earlyDeleteHelpers_.swap(tempCorrectSize);
185  }
186 
187  void Path::setPathStatusInserter(PathStatusInserter* pathStatusInserter, Worker* pathStatusInserterWorker) {
188  pathStatusInserter_ = pathStatusInserter;
189  pathStatusInserterWorker_ = pathStatusInserterWorker;
190  }
191 
193  for (auto helper : earlyDeleteHelpers_) {
194  helper->pathFinished(iEvent);
195  }
196  }
197 
199  EventPrincipal const& iEP,
200  EventSetupImpl const& iES,
201  ServiceToken const& iToken,
202  StreamID const& iStreamID,
203  StreamContext const* iStreamContext) {
205  ++timesRun_;
206  waitingTasks_.add(iTask);
207  if (actReg_) {
208  ServiceRegistry::Operate guard(iToken);
209  actReg_->prePathEventSignal_(*iStreamContext, pathContext_);
210  }
211  state_ = hlt::Ready;
212 
213  if (workers_.empty()) {
214  ServiceRegistry::Operate guard(iToken);
215  finished(-1, true, std::exception_ptr(), iStreamContext, iEP, iES, iStreamID);
216  return;
217  }
218 
219  runNextWorkerAsync(0, iEP, iES, iToken, iStreamID, iStreamContext);
220  }
221 
222  void Path::workerFinished(std::exception_ptr const* iException,
223  unsigned int iModuleIndex,
224  EventPrincipal const& iEP,
225  EventSetupImpl const& iES,
226  ServiceToken const& iToken,
227  StreamID const& iID,
228  StreamContext const* iContext) {
229  ServiceRegistry::Operate guard(iToken);
230 
231  //This call also allows the WorkerInPath to update statistics
232  // so should be done even if an exception happened
233  auto& worker = workers_[iModuleIndex];
234  bool shouldContinue = worker.checkResultsOfRunWorker(true);
235  std::exception_ptr finalException;
236  if (iException) {
237  std::unique_ptr<cms::Exception> pEx;
238  try {
239  std::rethrow_exception(*iException);
240  } catch (cms::Exception& oldEx) {
241  pEx = std::unique_ptr<cms::Exception>(oldEx.clone());
242  }
243  try {
244  std::ostringstream ost;
245  ost << iEP.id();
246  shouldContinue = handleWorkerFailure(*pEx,
247  iModuleIndex,
248  /*isEvent*/ true,
249  /*isBegin*/ true,
250  InEvent,
251  worker.getWorker()->description(),
252  ost.str());
253  //If we didn't rethrow, then we effectively skipped
254  worker.skipWorker(iEP);
255  finalException = std::exception_ptr();
256  } catch (...) {
257  shouldContinue = false;
258  finalException = std::current_exception();
259  //set the exception early to avoid case where another Path is waiting
260  // on a module in this Path and not running the module will lead to a
261  // different but related exception in the other Path. We want this
262  // Paths exception to be the one that gets reported.
263  waitingTasks_.presetTaskAsFailed(finalException);
264  }
265  }
267  shouldContinue = false;
268  }
269  auto const nextIndex = iModuleIndex + 1;
270  if (shouldContinue and nextIndex < workers_.size()) {
271  runNextWorkerAsync(nextIndex, iEP, iES, iToken, iID, iContext);
272  return;
273  }
274 
275  if (not shouldContinue) {
276  //we are leaving the path early
277  for (auto it = workers_.begin() + nextIndex, itEnd = workers_.end(); it != itEnd; ++it) {
278  it->skipWorker(iEP);
279  }
280  handleEarlyFinish(iEP);
281  }
282  finished(iModuleIndex, shouldContinue, finalException, iContext, iEP, iES, iID);
283  }
284 
285  void Path::finished(int iModuleIndex,
286  bool iSucceeded,
287  std::exception_ptr iException,
288  StreamContext const* iContext,
289  EventPrincipal const& iEP,
290  EventSetupImpl const& iES,
291  StreamID const& streamID) {
292  if (not iException) {
293  updateCounters(iSucceeded, true);
294  recordStatus(iModuleIndex, true);
295  }
296  try {
297  HLTPathStatus status(state_, iModuleIndex);
298 
299  if (pathStatusInserter_) { // pathStatusInserter is null for EndPaths
300  pathStatusInserter_->setPathStatus(streamID, status);
301  }
302  std::exception_ptr jException =
304  iEP, iES, streamID, ParentContext(iContext), iContext);
305  if (jException && not iException) {
306  iException = jException;
307  }
308  actReg_->postPathEventSignal_(*iContext, pathContext_, status);
309  } catch (...) {
310  if (not iException) {
311  iException = std::current_exception();
312  }
313  }
314  waitingTasks_.doneWaiting(iException);
315  }
316 
317  void Path::runNextWorkerAsync(unsigned int iNextModuleIndex,
318  EventPrincipal const& iEP,
319  EventSetupImpl const& iES,
320  ServiceToken const& iToken,
321  StreamID const& iID,
322  StreamContext const* iContext) {
323  auto nextTask = make_waiting_task(
324  tbb::task::allocate_root(),
325  [this, iNextModuleIndex, &iEP, &iES, iID, iContext, token = iToken](std::exception_ptr const* iException) {
326  this->workerFinished(iException, iNextModuleIndex, iEP, iES, token, iID, iContext);
327  });
328 
330  nextTask, iEP, iES, iToken, iID, iContext);
331  }
332 
333 } // namespace edm
std::string const & pathName() const
Definition: PathContext.h:30
void recordStatus(int nwrwue, bool isEvent)
Definition: Path.cc:147
void handleEarlyFinish(EventPrincipal const &)
Definition: Path.cc:192
Definition: helper.py:1
not [yet] run
Definition: HLTenums.h:17
roAction_t actions[nactions]
Definition: GenABIO.cc:181
static const std::string & codeToString(Code)
-----------— implementation details ---------------—
Definition: EDMException.cc:52
std::vector< EarlyDeleteHelper * > earlyDeleteHelpers_
Definition: Path.h:120
int timesFailed_
Definition: Path.h:109
std::vector< WorkerInPath > WorkersInPath
Definition: Path.h:48
void add(WaitingTask *)
Adds task to the waiting list.
std::exception_ptr runModuleDirectly(typename T::MyPrincipal const &ep, EventSetupImpl const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:1126
#define nullptr
EventID const & id() const
reject
Definition: HLTenums.h:19
void raise()
Definition: Exception.h:98
void setPathStatus(StreamID const &, HLTPathStatus const &)
std::string const & category() const
Definition: Exception.cc:143
exception_actions::ActionCodes find(const std::string &category) const
int timesExcept_
Definition: Path.h:110
PathContext pathContext_
Definition: Path.h:122
size_type size() const
Definition: Path.h:91
WaitingTaskList waitingTasks_
Definition: Path.h:123
void reset()
Resets access to the resource so that added tasks will wait.
void processOneOccurrenceAsync(WaitingTask *, EventPrincipal const &, EventSetupImpl const &, ServiceToken const &, StreamID const &, StreamContext const *)
Definition: Path.cc:198
BranchType
Definition: BranchType.h:11
void setPathStatusInserter(PathStatusInserter *pathStatusInserter, Worker *pathStatusInserterWorker)
Definition: Path.cc:187
std::shared_ptr< ActivityRegistry > actReg_
Definition: Path.h:116
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
TrigResPtr trptr_
Definition: Path.h:115
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
int iEvent
Definition: GenABIO.cc:224
std::shared_ptr< HLTGlobalStatus > TrigResPtr
Definition: Path.h:50
accept
Definition: HLTenums.h:18
int bitpos_
Definition: Path.h:114
Definition: Path.h:44
std::list< std::string > const & context() const
Definition: Exception.cc:147
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:169
WorkersInPath workers_
Definition: Path.h:119
static void exceptionContext(cms::Exception &ex, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id, PathContext const &)
Definition: Path.cc:117
void presetTaskAsFailed(std::exception_ptr iExcept)
void updateCounters(bool succeed, bool isEvent)
Definition: Path.cc:153
Path(int bitpos, std::string const &path_name, WorkersInPath const &workers, TrigResPtr trptr, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > reg, StreamContext const *streamContext, std::atomic< bool > *stopProcessEvent, PathContext::PathType pathType)
Definition: Path.cc:15
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
void addContext(std::string const &context)
Definition: Exception.cc:165
bool handleWorkerFailure(cms::Exception &e, int nwrwue, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id)
Definition: Path.cc:64
void setEarlyDeleteHelpers(std::map< const Worker *, EarlyDeleteHelper * > const &)
Definition: Path.cc:171
ExceptionToActionTable const * act_table_
Definition: Path.h:117
State state_
Definition: Path.h:112
#define begin
Definition: vmac.h:32
HLT enums.
Worker const * getWorker(size_type i) const
Definition: Path.h:96
void finished(int iModuleIndex, bool iSucceeded, std::exception_ptr, StreamContext const *, EventPrincipal const &iEP, EventSetupImpl const &iES, StreamID const &streamID)
Definition: Path.cc:285
void runNextWorkerAsync(unsigned int iNextModuleIndex, EventPrincipal const &, EventSetupImpl const &, ServiceToken const &, StreamID const &, StreamContext const *)
Definition: Path.cc:317
int timesPassed_
Definition: Path.h:108
std::atomic< bool > * stopProcessingEvent_
Definition: Path.h:124
int timesRun_
Definition: Path.h:107
Worker * pathStatusInserterWorker_
Definition: Path.h:127
void clearCounters()
Definition: Path.cc:165
void workerFinished(std::exception_ptr const *iException, unsigned int iModuleIndex, EventPrincipal const &iEP, EventSetupImpl const &iES, ServiceToken const &iToken, StreamID const &iID, StreamContext const *iContext)
Definition: Path.cc:222
virtual Exception * clone() const
Definition: Exception.cc:181
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
PathStatusInserter * pathStatusInserter_
Definition: Path.h:126