CMS 3D CMS Logo

Path.cc
Go to the documentation of this file.
1 
14 
15 #include <algorithm>
16 
17 namespace edm {
18  Path::Path(int bitpos,
19  std::string const& path_name,
20  WorkersInPath const& workers,
21  TrigResPtr trptr,
23  std::shared_ptr<ActivityRegistry> areg,
24  StreamContext const* streamContext,
25  std::atomic<bool>* stopProcessingEvent,
26  PathContext::PathType pathType)
27  : timesRun_(),
28  timesPassed_(),
29  timesFailed_(),
30  timesExcept_(),
31  failedModuleIndex_(workers.size()),
32  state_(hlt::Ready),
33  bitpos_(bitpos),
34  trptr_(trptr),
35  actReg_(areg),
36  act_table_(&actions),
37  workers_(workers),
38  pathContext_(path_name, streamContext, bitpos, pathType),
39  stopProcessingEvent_(stopProcessingEvent),
40  pathStatusInserter_(nullptr),
41  pathStatusInserterWorker_(nullptr) {
42  for (auto& workerInPath : workers_) {
43  workerInPath.setPathContext(&pathContext_);
44  }
45  modulesToRun_ = workers_.size();
46  }
47 
48  Path::Path(Path const& r)
49  : timesRun_(r.timesRun_),
50  timesPassed_(r.timesPassed_),
51  timesFailed_(r.timesFailed_),
52  timesExcept_(r.timesExcept_),
53  failedModuleIndex_(r.failedModuleIndex_),
54  state_(r.state_),
55  bitpos_(r.bitpos_),
56  trptr_(r.trptr_),
57  actReg_(r.actReg_),
58  act_table_(r.act_table_),
59  workers_(r.workers_),
60  pathContext_(r.pathContext_),
61  stopProcessingEvent_(r.stopProcessingEvent_),
62  pathStatusInserter_(r.pathStatusInserter_),
63  pathStatusInserterWorker_(r.pathStatusInserterWorker_) {
64  for (auto& workerInPath : workers_) {
65  workerInPath.setPathContext(&pathContext_);
66  }
67  modulesToRun_ = workers_.size();
68  }
69 
71  int nwrwue,
72  bool isEvent,
73  bool begin,
74  BranchType branchType,
75  ModuleDescription const& desc,
76  std::string const& id) const {
77  if (e.context().empty()) {
78  exceptionContext(e, isEvent, begin, branchType, desc, id, pathContext_);
79  }
80  bool should_continue = true;
81 
82  // there is no support as of yet for specific paths having
83  // different exception behavior
84 
85  // If not processing an event, always rethrow.
87  switch (action) {
89  should_continue = false;
90  edm::printCmsExceptionWarning("FailPath", e);
91  break;
92  }
94  //Need the other Paths to stop as soon as possible
96  *stopProcessingEvent_ = true;
97  }
98  break;
99  }
100  default: {
103  if (e.category() == pNF) {
104  std::ostringstream ost;
105  ost << "If you wish to continue processing events after a " << pNF << " exception,\n"
106  << "add \"SkipEvent = cms.untracked.vstring('ProductNotFound')\" to the \"options\" PSet in the "
107  "configuration.\n";
108  e.addAdditionalInfo(ost.str());
109  }
110  }
111  //throw will copy which will slice the object
112  e.raise();
113  }
114  }
115 
116  return should_continue;
117  }
118 
120  bool isEvent,
121  bool begin,
122  BranchType branchType,
123  ModuleDescription const& desc,
124  std::string const& id,
125  PathContext const& pathContext) {
126  std::ostringstream ost;
127  ost << "Running path '" << pathContext.pathName() << "'";
128  ex.addContext(ost.str());
129  ost.str("");
130  ost << "Processing ";
131  //For the event case, the Worker has already
132  // added the necessary module context to the exception
133  if (begin && branchType == InRun) {
134  ost << "stream begin Run";
135  } else if (begin && branchType == InLumi) {
136  ost << "stream begin LuminosityBlock ";
137  } else if (!begin && branchType == InLumi) {
138  ost << "stream end LuminosityBlock ";
139  } else if (!begin && branchType == InRun) {
140  ost << "stream end Run ";
141  } else if (isEvent) {
142  // It should be impossible to get here ...
143  ost << "Event ";
144  }
145  ost << id;
146  ex.addContext(ost.str());
147  }
148 
149  void Path::threadsafe_setFailedModuleInfo(int nwrwue, std::exception_ptr iExcept) {
150  bool expected = false;
151  while (stateLock_.compare_exchange_strong(expected, true)) {
152  expected = false;
153  }
154  if (iExcept) {
155  if (state_ == hlt::Exception) {
156  if (nwrwue < failedModuleIndex_) {
157  failedModuleIndex_ = nwrwue;
158  }
159  } else {
161  failedModuleIndex_ = nwrwue;
162  }
163  } else {
164  if (state_ != hlt::Exception) {
165  if (nwrwue < failedModuleIndex_) {
166  failedModuleIndex_ = nwrwue;
167  }
168  state_ = hlt::Fail;
169  }
170  }
171 
172  stateLock_ = false;
173  }
174 
176  if (trptr_) {
177  trptr_->at(bitpos_) = HLTPathStatus(state, nwrwue);
178  }
179  }
180 
182  switch (state) {
183  case hlt::Pass: {
184  ++timesPassed_;
185  break;
186  }
187  case hlt::Fail: {
188  ++timesFailed_;
189  break;
190  }
191  case hlt::Exception: {
192  ++timesExcept_;
193  }
194  default:;
195  }
196  }
197 
199  using std::placeholders::_1;
201  for_all(workers_, std::bind(&WorkerInPath::clearCounters, _1));
202  }
203 
204  void Path::setEarlyDeleteHelpers(std::map<const Worker*, EarlyDeleteHelper*> const& iWorkerToDeleter) {
205  for (unsigned int index = 0; index != size(); ++index) {
206  auto found = iWorkerToDeleter.find(getWorker(index));
207  if (found != iWorkerToDeleter.end()) {
208  found->second->addedToPath();
209  }
210  }
211  }
212 
213  void Path::setPathStatusInserter(PathStatusInserter* pathStatusInserter, Worker* pathStatusInserterWorker) {
214  pathStatusInserter_ = pathStatusInserter;
215  pathStatusInserterWorker_ = pathStatusInserterWorker;
216  }
217 
219  EventTransitionInfo const& iInfo,
220  ServiceToken const& iToken,
221  StreamID const& iStreamID,
222  StreamContext const* iStreamContext) {
224  modulesToRun_ = workers_.size();
225  ++timesRun_;
226  waitingTasks_.add(iTask);
227  if (actReg_) {
228  ServiceRegistry::Operate guard(iToken);
229  actReg_->prePathEventSignal_(*iStreamContext, pathContext_);
230  }
231  //If the Path succeeds, these are the values we have at the end
232  state_ = hlt::Pass;
233  failedModuleIndex_ = workers_.size() - 1;
234 
235  if (workers_.empty()) {
236  ServiceRegistry::Operate guard(iToken);
237  finished(std::exception_ptr(), iStreamContext, iInfo, iStreamID);
238  return;
239  }
240 
241  runNextWorkerAsync(0, iInfo, iToken, iStreamID, iStreamContext, *iTask.group());
242  }
243 
244  void Path::workerFinished(std::exception_ptr const* iException,
245  unsigned int iModuleIndex,
246  EventTransitionInfo const& iInfo,
247  ServiceToken const& iToken,
248  StreamID const& iID,
249  StreamContext const* iContext,
250  oneapi::tbb::task_group& iGroup) {
251  EventPrincipal const& iEP = iInfo.principal();
252  ServiceRegistry::Operate guard(iToken);
253 
254  //This call also allows the WorkerInPath to update statistics
255  // so should be done even if an exception happened
256  auto& worker = workers_[iModuleIndex];
257  bool shouldContinue = worker.checkResultsOfRunWorker(true);
258  std::exception_ptr finalException;
259  if (iException) {
260  std::unique_ptr<cms::Exception> pEx;
261  try {
262  std::rethrow_exception(*iException);
263  } catch (cms::Exception& oldEx) {
264  pEx = std::unique_ptr<cms::Exception>(oldEx.clone());
265  } catch (std::exception const& oldEx) {
266  pEx = std::make_unique<edm::Exception>(errors::StdException);
267  } catch (...) {
268  pEx = std::make_unique<edm::Exception>(errors::Unknown);
269  }
270  // Caught exception is propagated via WaitingTaskList
271  CMS_SA_ALLOW try {
272  std::ostringstream ost;
273  ost << iEP.id();
274  ModuleDescription const* desc = worker.getWorker()->description();
275  assert(desc != nullptr);
276  shouldContinue = handleWorkerFailure(*pEx,
277  iModuleIndex,
278  /*isEvent*/ true,
279  /*isBegin*/ true,
280  InEvent,
281  *desc,
282  ost.str());
283  //If we didn't rethrow, then we effectively skipped
284  worker.skipWorker(iEP);
285  finalException = std::exception_ptr();
286  } catch (...) {
287  shouldContinue = false;
288  finalException = std::current_exception();
289  //set the exception early to avoid case where another Path is waiting
290  // on a module in this Path and not running the module will lead to a
291  // different but related exception in the other Path. We want this
292  // Paths exception to be the one that gets reported.
293  waitingTasks_.presetTaskAsFailed(finalException);
294  }
295  }
297  shouldContinue = false;
298  }
299  auto const nextIndex = iModuleIndex + 1;
300  if (shouldContinue and nextIndex < workers_.size()) {
301  if (not worker.runConcurrently()) {
302  --modulesToRun_;
303  runNextWorkerAsync(nextIndex, iInfo, iToken, iID, iContext, iGroup);
304  return;
305  }
306  }
307 
308  if (not shouldContinue) {
309  threadsafe_setFailedModuleInfo(iModuleIndex, finalException);
310  }
311  if (not shouldContinue and not worker.runConcurrently()) {
312  //we are leaving the path early
313  for (auto it = workers_.begin() + nextIndex, itEnd = workers_.end(); it != itEnd; ++it) {
314  --modulesToRun_;
315  it->skipWorker(iEP);
316  }
317  }
318  if (--modulesToRun_ == 0) {
319  //The path should only be marked as finished once all outstanding modules finish
320  finished(finalException, iContext, iInfo, iID);
321  }
322  }
323 
324  void Path::finished(std::exception_ptr iException,
325  StreamContext const* iContext,
326  EventTransitionInfo const& iInfo,
327  StreamID const& streamID) {
329  auto failedModuleBitPosition = bitPosition(failedModuleIndex_);
330  recordStatus(failedModuleBitPosition, state_);
331  // Caught exception is propagated via WaitingTaskList
332  CMS_SA_ALLOW try {
333  HLTPathStatus status(state_, failedModuleBitPosition);
334 
335  if (pathStatusInserter_) { // pathStatusInserter is null for EndPaths
337  }
338  std::exception_ptr jException =
340  iInfo, streamID, ParentContext(iContext), iContext);
341  if (jException && not iException) {
342  iException = jException;
343  }
344  actReg_->postPathEventSignal_(*iContext, pathContext_, status);
345  } catch (...) {
346  if (not iException) {
347  iException = std::current_exception();
348  }
349  }
350  waitingTasks_.doneWaiting(iException);
351  }
352 
353  void Path::runNextWorkerAsync(unsigned int iNextModuleIndex,
354  EventTransitionInfo const& iInfo,
355  ServiceToken const& iToken,
356  StreamID const& iID,
357  StreamContext const* iContext,
358  oneapi::tbb::task_group& iGroup) {
359  //Figure out which next modules can run concurrently
360  const int firstModuleIndex = iNextModuleIndex;
361  int lastModuleIndex = firstModuleIndex;
362  while (lastModuleIndex + 1 != static_cast<int>(workers_.size()) and workers_[lastModuleIndex].runConcurrently()) {
363  ++lastModuleIndex;
364  }
365  for (; lastModuleIndex >= firstModuleIndex; --lastModuleIndex) {
366  ServiceWeakToken weakToken = iToken;
367  auto nextTask = make_waiting_task([this, lastModuleIndex, info = iInfo, iID, iContext, weakToken, &iGroup](
368  std::exception_ptr const* iException) {
369  this->workerFinished(iException, lastModuleIndex, info, weakToken.lock(), iID, iContext, iGroup);
370  });
372  WaitingTaskHolder(iGroup, nextTask), iInfo, iToken, iID, iContext);
373  }
374  }
375 
376 } // namespace edm
size
Write out results.
Worker const * getWorker(size_type i) const
Definition: Path.h:89
std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1182
TrigResPtr const trptr_
Definition: Path.h:109
bool handleWorkerFailure(cms::Exception &e, int nwrwue, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id) const
Definition: Path.cc:70
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::atomic< unsigned int > modulesToRun_
Definition: Path.h:119
void recordStatus(int nwrwue, hlt::HLTState state)
Definition: Path.cc:175
roAction_t actions[nactions]
Definition: GenABIO.cc:181
static const std::string & codeToString(Code)
-----------— implementation details ---------------—
Definition: EDMException.cc:54
int timesFailed_
Definition: Path.h:99
std::vector< WorkerInPath > WorkersInPath
Definition: Path.h:45
void threadsafe_setFailedModuleInfo(int nwrwue, std::exception_ptr)
Definition: Path.cc:149
EventPrincipal & principal()
HLTState
status of a trigger path
Definition: HLTenums.h:16
void runNextWorkerAsync(unsigned int iNextModuleIndex, EventTransitionInfo const &, ServiceToken const &, StreamID const &, StreamContext const *, oneapi::tbb::task_group &iGroup)
Definition: Path.cc:353
reject
Definition: HLTenums.h:19
void setPathStatus(StreamID const &, HLTPathStatus const &)
int timesExcept_
Definition: Path.h:100
PathContext pathContext_
Definition: Path.h:116
WaitingTaskList waitingTasks_
Definition: Path.h:117
void reset()
Resets access to the resource so that added tasks will wait.
int const bitpos_
Definition: Path.h:108
assert(be >=bs)
BranchType
Definition: BranchType.h:11
void setPathStatusInserter(PathStatusInserter *pathStatusInserter, Worker *pathStatusInserterWorker)
Definition: Path.cc:213
void workerFinished(std::exception_ptr const *, unsigned int iModuleIndex, EventTransitionInfo const &, ServiceToken const &, StreamID const &, StreamContext const *, oneapi::tbb::task_group &iGroup)
Definition: Path.cc:244
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
oneapi::tbb::task_group * group() const noexcept
void setEarlyDeleteHelpers(std::map< const Worker *, EarlyDeleteHelper *> const &)
Definition: Path.cc:204
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
std::shared_ptr< HLTGlobalStatus > TrigResPtr
Definition: Path.h:47
accept
Definition: HLTenums.h:18
void finished(std::exception_ptr, StreamContext const *, EventTransitionInfo const &, StreamID const &)
Definition: Path.cc:324
Definition: Path.h:41
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
void updateCounters(hlt::HLTState state)
Definition: Path.cc:181
std::atomic< bool > *const stopProcessingEvent_
Definition: Path.h:118
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
ServiceToken lock() const
Definition: ServiceToken.h:101
ExceptionToActionTable const *const act_table_
Definition: Path.h:112
WorkersInPath workers_
Definition: Path.h:114
static void exceptionContext(cms::Exception &ex, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id, PathContext const &)
Definition: Path.cc:119
void presetTaskAsFailed(std::exception_ptr iExcept)
exception_actions::ActionCodes find(const std::string &category) const
Path(int bitpos, std::string const &path_name, WorkersInPath const &workers, TrigResPtr trptr, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > reg, StreamContext const *streamContext, std::atomic< bool > *stopProcessEvent, PathContext::PathType pathType)
Definition: Path.cc:18
int failedModuleIndex_
Definition: Path.h:105
size_type size() const
Definition: Path.h:84
void addContext(std::string const &context)
Definition: Exception.cc:165
std::shared_ptr< ActivityRegistry > const actReg_
Definition: Path.h:111
State state_
Definition: Path.h:106
HLT enums.
int bitPosition() const
Definition: Path.h:73
std::string const & pathName() const
Definition: PathContext.h:30
EventID const & id() const
std::atomic< bool > stateLock_
Definition: Path.h:104
int timesPassed_
Definition: Path.h:98
virtual Exception * clone() const
Definition: Exception.cc:181
int timesRun_
Definition: Path.h:97
Worker * pathStatusInserterWorker_
Definition: Path.h:122
void clearCounters()
Definition: Path.cc:198
void processOneOccurrenceAsync(WaitingTaskHolder, EventTransitionInfo const &, ServiceToken const &, StreamID const &, StreamContext const *)
Definition: Path.cc:218
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
PathStatusInserter * pathStatusInserter_
Definition: Path.h:121