CMS 3D CMS Logo

Path.cc
Go to the documentation of this file.
1 
14 
15 #include <algorithm>
16 #include <sstream>
17 
18 namespace edm {
19  Path::Path(int bitpos,
20  std::string const& path_name,
21  WorkersInPath const& workers,
22  TrigResPtr trptr,
24  std::shared_ptr<ActivityRegistry> areg,
25  StreamContext const* streamContext,
26  PathContext::PathType pathType)
27  : timesRun_(),
28  timesPassed_(),
29  timesFailed_(),
30  timesExcept_(),
31  failedModuleIndex_(workers.size()),
32  state_(hlt::Ready),
33  bitpos_(bitpos),
34  trptr_(trptr),
35  actReg_(areg),
36  act_table_(&actions),
37  workers_(workers),
38  pathContext_(path_name, streamContext, bitpos, pathType),
39  pathStatusInserter_(nullptr),
40  pathStatusInserterWorker_(nullptr) {
41  for (auto& workerInPath : workers_) {
42  workerInPath.setPathContext(&pathContext_);
43  }
44  modulesToRun_ = workers_.size();
45  }
46 
47  Path::Path(Path const& r)
48  : timesRun_(r.timesRun_),
49  timesPassed_(r.timesPassed_),
50  timesFailed_(r.timesFailed_),
51  timesExcept_(r.timesExcept_),
52  failedModuleIndex_(r.failedModuleIndex_),
53  state_(r.state_),
54  bitpos_(r.bitpos_),
55  trptr_(r.trptr_),
56  actReg_(r.actReg_),
57  act_table_(r.act_table_),
58  workers_(r.workers_),
59  pathContext_(r.pathContext_),
60  pathStatusInserter_(r.pathStatusInserter_),
61  pathStatusInserterWorker_(r.pathStatusInserterWorker_) {
62  for (auto& workerInPath : workers_) {
63  workerInPath.setPathContext(&pathContext_);
64  }
65  modulesToRun_ = workers_.size();
66  }
67 
69  if (e.context().empty()) {
70  exceptionContext(e, true /*isEvent*/, true /*begin*/, InEvent /*branchType*/, desc, id, pathContext_);
71  }
72  // there is no support as of yet for specific paths having
73  // different exception behavior
74 
75  // If not processing an event, always rethrow.
77  switch (action) {
79  bool expected = false;
80  if (printedException_.compare_exchange_strong(expected, true)) {
81  std::ostringstream s;
82  s << "Path " << name() << " applying TryToContinue on";
83  edm::printCmsExceptionWarning(s.str().c_str(), e);
84  }
85  break;
86  }
87  default: {
90  if (e.category() == pNF) {
91  std::ostringstream ost;
92  ost << "If you wish to continue processing events after a " << pNF << " exception,\n"
93  << "add \"TryToContinue = cms.untracked.vstring('ProductNotFound')\" to the \"options\" PSet in the "
94  "configuration.\n";
95  e.addAdditionalInfo(ost.str());
96  }
97  }
98  //throw will copy which will slice the object
99  e.raise();
100  }
101  }
102  }
103 
105  bool isEvent,
106  bool begin,
107  BranchType branchType,
108  ModuleDescription const& desc,
109  std::string const& id,
110  PathContext const& pathContext) {
111  std::ostringstream ost;
112  ost << "Running path '" << pathContext.pathName() << "'";
113  ex.addContext(ost.str());
114  ost.str("");
115  ost << "Processing ";
116  //For the event case, the Worker has already
117  // added the necessary module context to the exception
118  if (begin && branchType == InRun) {
119  ost << "stream begin Run";
120  } else if (begin && branchType == InLumi) {
121  ost << "stream begin LuminosityBlock ";
122  } else if (!begin && branchType == InLumi) {
123  ost << "stream end LuminosityBlock ";
124  } else if (!begin && branchType == InRun) {
125  ost << "stream end Run ";
126  } else if (isEvent) {
127  // It should be impossible to get here ...
128  ost << "Event ";
129  }
130  ost << id;
131  ex.addContext(ost.str());
132  }
133 
134  void Path::threadsafe_setFailedModuleInfo(int nwrwue, bool iExcept) {
135  bool expected = false;
136  while (not stateLock_.compare_exchange_strong(expected, true)) {
137  expected = false;
138  }
139  if (iExcept) {
140  if (state_ == hlt::Exception) {
141  if (nwrwue < failedModuleIndex_) {
142  failedModuleIndex_ = nwrwue;
143  }
144  } else {
146  failedModuleIndex_ = nwrwue;
147  }
148  } else {
149  if (state_ != hlt::Exception) {
150  if (nwrwue < failedModuleIndex_) {
151  failedModuleIndex_ = nwrwue;
152  }
153  state_ = hlt::Fail;
154  }
155  }
156 
157  stateLock_ = false;
158  }
159 
161  if (trptr_) {
162  trptr_->at(bitpos_) = HLTPathStatus(state, nwrwue);
163  }
164  }
165 
167  switch (state) {
168  case hlt::Pass: {
169  ++timesPassed_;
170  break;
171  }
172  case hlt::Fail: {
173  ++timesFailed_;
174  break;
175  }
176  case hlt::Exception: {
177  ++timesExcept_;
178  }
179  default:;
180  }
181  }
182 
184  using std::placeholders::_1;
186  for_all(workers_, std::bind(&WorkerInPath::clearCounters, _1));
187  }
188 
189  void Path::setEarlyDeleteHelpers(std::map<const Worker*, EarlyDeleteHelper*> const& iWorkerToDeleter) {
190  for (unsigned int index = 0; index != size(); ++index) {
191  auto found = iWorkerToDeleter.find(getWorker(index));
192  if (found != iWorkerToDeleter.end()) {
193  found->second->addedToPath();
194  }
195  }
196  }
197 
198  void Path::setPathStatusInserter(PathStatusInserter* pathStatusInserter, Worker* pathStatusInserterWorker) {
199  pathStatusInserter_ = pathStatusInserter;
200  pathStatusInserterWorker_ = pathStatusInserterWorker;
201  }
202 
204  EventTransitionInfo const& iInfo,
205  ServiceToken const& iToken,
206  StreamID const& iStreamID,
207  StreamContext const* iStreamContext) {
209  modulesToRun_ = workers_.size();
210  ++timesRun_;
211  waitingTasks_.add(iTask);
212  printedException_ = false;
213  if (actReg_) {
214  ServiceRegistry::Operate guard(iToken);
215  actReg_->prePathEventSignal_(*iStreamContext, pathContext_);
216  }
217  //If the Path succeeds, these are the values we have at the end
218  state_ = hlt::Pass;
219  failedModuleIndex_ = workers_.size() - 1;
220 
221  if (workers_.empty()) {
222  ServiceRegistry::Operate guard(iToken);
223  finished(std::exception_ptr(), iStreamContext, iInfo, iStreamID);
224  return;
225  }
226 
227  runNextWorkerAsync(0, iInfo, iToken, iStreamID, iStreamContext, *iTask.group());
228  }
229 
230  void Path::workerFinished(std::exception_ptr const* iException,
231  unsigned int iModuleIndex,
232  EventTransitionInfo const& iInfo,
233  ServiceToken const& iToken,
234  StreamID const& iID,
235  StreamContext const* iContext,
236  oneapi::tbb::task_group& iGroup) {
237  EventPrincipal const& iEP = iInfo.principal();
238  ServiceRegistry::Operate guard(iToken);
239 
240  //This call also allows the WorkerInPath to update statistics
241  // so should be done even if an exception happened
242  auto& worker = workers_[iModuleIndex];
243  bool shouldContinue = worker.checkResultsOfRunWorker(true);
244  std::exception_ptr finalException;
245  if (iException) {
246  shouldContinue = false;
247  std::unique_ptr<cms::Exception> pEx;
248  try {
249  std::rethrow_exception(*iException);
250  } catch (cms::Exception& oldEx) {
251  pEx = std::unique_ptr<cms::Exception>(oldEx.clone());
252  } catch (std::exception const& oldEx) {
253  pEx = std::make_unique<edm::Exception>(errors::StdException);
254  } catch (...) {
255  pEx = std::make_unique<edm::Exception>(errors::Unknown);
256  }
257  // Caught exception is propagated via WaitingTaskList
258  CMS_SA_ALLOW try {
259  std::ostringstream ost;
260  ost << iEP.id();
261  ModuleDescription const* desc = worker.getWorker()->description();
262  assert(desc != nullptr);
263  handleWorkerFailure(*pEx, iModuleIndex, *desc, ost.str());
264  //If we didn't rethrow, then we effectively skipped
265  worker.skipWorker(iEP);
266  } catch (...) {
267  finalException = std::current_exception();
268  //set the exception early to avoid case where another Path is waiting
269  // on a module in this Path and not running the module will lead to a
270  // different but related exception in the other Path. We want this
271  // Paths exception to be the one that gets reported.
272  waitingTasks_.presetTaskAsFailed(finalException);
273  }
274  }
275  auto const nextIndex = iModuleIndex + 1;
276  if (shouldContinue and nextIndex < workers_.size()) {
277  if (not worker.runConcurrently()) {
278  --modulesToRun_;
279  runNextWorkerAsync(nextIndex, iInfo, iToken, iID, iContext, iGroup);
280  return;
281  }
282  }
283 
284  if (not shouldContinue) {
285  threadsafe_setFailedModuleInfo(iModuleIndex, iException != nullptr);
286  }
287  if (not shouldContinue and not worker.runConcurrently()) {
288  //we are leaving the path early
289  for (auto it = workers_.begin() + nextIndex, itEnd = workers_.end(); it != itEnd; ++it) {
290  --modulesToRun_;
291  it->skipWorker(iEP);
292  }
293  }
294  if (--modulesToRun_ == 0) {
295  //The path should only be marked as finished once all outstanding modules finish
296  finished(finalException, iContext, iInfo, iID);
297  }
298  }
299 
300  void Path::finished(std::exception_ptr iException,
301  StreamContext const* iContext,
302  EventTransitionInfo const& iInfo,
303  StreamID const& streamID) {
305  auto failedModuleBitPosition = bitPosition(failedModuleIndex_);
306  recordStatus(failedModuleBitPosition, state_);
307  // Caught exception is propagated via WaitingTaskList
308  CMS_SA_ALLOW try {
309  HLTPathStatus status(state_, failedModuleBitPosition);
310 
311  if (pathStatusInserter_) { // pathStatusInserter is null for EndPaths
313  }
315  std::exception_ptr jException =
317  iInfo, streamID, ParentContext(iContext), iContext);
318  if (jException && not iException) {
319  iException = jException;
320  }
321  }
322  actReg_->postPathEventSignal_(*iContext, pathContext_, status);
323  } catch (...) {
324  if (not iException) {
325  iException = std::current_exception();
326  }
327  }
328  waitingTasks_.doneWaiting(iException);
329  }
330 
331  void Path::runNextWorkerAsync(unsigned int iNextModuleIndex,
332  EventTransitionInfo const& iInfo,
333  ServiceToken const& iToken,
334  StreamID const& iID,
335  StreamContext const* iContext,
336  oneapi::tbb::task_group& iGroup) {
337  //Figure out which next modules can run concurrently
338  const int firstModuleIndex = iNextModuleIndex;
339  int lastModuleIndex = firstModuleIndex;
340  while (lastModuleIndex + 1 != static_cast<int>(workers_.size()) and workers_[lastModuleIndex].runConcurrently()) {
341  ++lastModuleIndex;
342  }
343  for (; lastModuleIndex >= firstModuleIndex; --lastModuleIndex) {
344  ServiceWeakToken weakToken = iToken;
345  auto nextTask = make_waiting_task([this, lastModuleIndex, info = iInfo, iID, iContext, weakToken, &iGroup](
346  std::exception_ptr const* iException) {
347  this->workerFinished(iException, lastModuleIndex, info, weakToken.lock(), iID, iContext, iGroup);
348  });
350  WaitingTaskHolder(iGroup, nextTask), iInfo, iToken, iID, iContext);
351  }
352  }
353 
354 } // namespace edm
Worker const * getWorker(size_type i) const
Definition: Path.h:88
std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1259
TrigResPtr const trptr_
Definition: Path.h:109
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::atomic< unsigned int > modulesToRun_
Definition: Path.h:118
void recordStatus(int nwrwue, hlt::HLTState state)
Definition: Path.cc:160
roAction_t actions[nactions]
Definition: GenABIO.cc:181
static const std::string & codeToString(Code)
-----------— implementation details ---------------—
Definition: EDMException.cc:55
int timesFailed_
Definition: Path.h:98
std::vector< WorkerInPath > WorkersInPath
Definition: Path.h:45
EventPrincipal & principal()
HLTState
status of a trigger path
Definition: HLTenums.h:16
void runNextWorkerAsync(unsigned int iNextModuleIndex, EventTransitionInfo const &, ServiceToken const &, StreamID const &, StreamContext const *, oneapi::tbb::task_group &iGroup)
Definition: Path.cc:331
reject
Definition: HLTenums.h:19
void setPathStatus(StreamID const &, HLTPathStatus const &)
int timesExcept_
Definition: Path.h:99
PathContext pathContext_
Definition: Path.h:116
WaitingTaskList waitingTasks_
Definition: Path.h:117
void reset()
Resets access to the resource so that added tasks will wait.
int const bitpos_
Definition: Path.h:108
assert(be >=bs)
void handleWorkerFailure(cms::Exception &e, int nwrwue, ModuleDescription const &, std::string const &id)
Definition: Path.cc:68
BranchType
Definition: BranchType.h:11
void setPathStatusInserter(PathStatusInserter *pathStatusInserter, Worker *pathStatusInserterWorker)
Definition: Path.cc:198
void workerFinished(std::exception_ptr const *, unsigned int iModuleIndex, EventTransitionInfo const &, ServiceToken const &, StreamID const &, StreamContext const *, oneapi::tbb::task_group &iGroup)
Definition: Path.cc:230
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
oneapi::tbb::task_group * group() const noexcept
void setEarlyDeleteHelpers(std::map< const Worker *, EarlyDeleteHelper *> const &)
Definition: Path.cc:189
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
std::shared_ptr< HLTGlobalStatus > TrigResPtr
Definition: Path.h:47
accept
Definition: HLTenums.h:18
void finished(std::exception_ptr, StreamContext const *, EventTransitionInfo const &, StreamID const &)
Definition: Path.cc:300
Definition: Path.h:41
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
void updateCounters(hlt::HLTState state)
Definition: Path.cc:166
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
ServiceToken lock() const
Definition: ServiceToken.h:101
ExceptionToActionTable const *const act_table_
Definition: Path.h:112
WorkersInPath workers_
Definition: Path.h:114
static void exceptionContext(cms::Exception &ex, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id, PathContext const &)
Definition: Path.cc:104
void presetTaskAsFailed(std::exception_ptr iExcept)
exception_actions::ActionCodes find(const std::string &category) const
int failedModuleIndex_
Definition: Path.h:105
size_type size() const
Definition: Path.h:83
void addContext(std::string const &context)
Definition: Exception.cc:169
std::shared_ptr< ActivityRegistry > const actReg_
Definition: Path.h:111
State state_
Definition: Path.h:106
HLT enums.
std::atomic< bool > printedException_
Definition: Path.h:101
int bitPosition() const
Definition: Path.h:72
std::string const & pathName() const
Definition: PathContext.h:30
EventID const & id() const
std::string const & name() const
Definition: Path.h:73
std::atomic< bool > stateLock_
Definition: Path.h:104
int timesPassed_
Definition: Path.h:97
virtual Exception * clone() const
Definition: Exception.cc:185
int timesRun_
Definition: Path.h:96
Worker * pathStatusInserterWorker_
Definition: Path.h:121
void clearCounters()
Definition: Path.cc:183
void processOneOccurrenceAsync(WaitingTaskHolder, EventTransitionInfo const &, ServiceToken const &, StreamID const &, StreamContext const *)
Definition: Path.cc:203
Path(int bitpos, std::string const &path_name, WorkersInPath const &workers, TrigResPtr trptr, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > reg, StreamContext const *streamContext, PathContext::PathType pathType)
Definition: Path.cc:19
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
void threadsafe_setFailedModuleInfo(int nwrwue, bool iExceptionHappened)
Definition: Path.cc:134
PathStatusInserter * pathStatusInserter_
Definition: Path.h:120