CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
Path.cc
Go to the documentation of this file.
1 
14 
15 #include <algorithm>
16 
17 namespace edm {
18  Path::Path(int bitpos,
19  std::string const& path_name,
20  WorkersInPath const& workers,
21  TrigResPtr trptr,
23  std::shared_ptr<ActivityRegistry> areg,
24  StreamContext const* streamContext,
25  std::atomic<bool>* stopProcessingEvent,
26  PathContext::PathType pathType)
27  : timesRun_(),
28  timesPassed_(),
29  timesFailed_(),
30  timesExcept_(),
31  failedModuleIndex_(workers.size()),
32  state_(hlt::Ready),
33  bitpos_(bitpos),
34  trptr_(trptr),
35  actReg_(areg),
36  act_table_(&actions),
37  workers_(workers),
38  pathContext_(path_name, streamContext, bitpos, pathType),
39  stopProcessingEvent_(stopProcessingEvent),
40  pathStatusInserter_(nullptr),
41  pathStatusInserterWorker_(nullptr) {
42  for (auto& workerInPath : workers_) {
43  workerInPath.setPathContext(&pathContext_);
44  }
45  modulesToRun_ = workers_.size();
46  }
47 
48  Path::Path(Path const& r)
49  : timesRun_(r.timesRun_),
50  timesPassed_(r.timesPassed_),
51  timesFailed_(r.timesFailed_),
52  timesExcept_(r.timesExcept_),
53  failedModuleIndex_(r.failedModuleIndex_),
54  state_(r.state_),
55  bitpos_(r.bitpos_),
56  trptr_(r.trptr_),
57  actReg_(r.actReg_),
58  act_table_(r.act_table_),
59  workers_(r.workers_),
60  pathContext_(r.pathContext_),
61  stopProcessingEvent_(r.stopProcessingEvent_),
62  pathStatusInserter_(r.pathStatusInserter_),
63  pathStatusInserterWorker_(r.pathStatusInserterWorker_) {
64  for (auto& workerInPath : workers_) {
65  workerInPath.setPathContext(&pathContext_);
66  }
67  modulesToRun_ = workers_.size();
68  }
69 
71  int nwrwue,
72  bool isEvent,
73  bool begin,
74  BranchType branchType,
75  ModuleDescription const& desc,
76  std::string const& id) const {
77  if (e.context().empty()) {
78  exceptionContext(e, isEvent, begin, branchType, desc, id, pathContext_);
79  }
80  bool should_continue = true;
81 
82  // there is no support as of yet for specific paths having
83  // different exception behavior
84 
85  // If not processing an event, always rethrow.
87  switch (action) {
89  should_continue = false;
90  edm::printCmsExceptionWarning("FailPath", e);
91  break;
92  }
94  //Need the other Paths to stop as soon as possible
96  *stopProcessingEvent_ = true;
97  }
98  break;
99  }
100  default: {
101  if (action == exception_actions::Rethrow) {
103  if (e.category() == pNF) {
104  std::ostringstream ost;
105  ost << "If you wish to continue processing events after a " << pNF << " exception,\n"
106  << "add \"SkipEvent = cms.untracked.vstring('ProductNotFound')\" to the \"options\" PSet in the "
107  "configuration.\n";
108  e.addAdditionalInfo(ost.str());
109  }
110  }
111  //throw will copy which will slice the object
112  e.raise();
113  }
114  }
115 
116  return should_continue;
117  }
118 
120  bool isEvent,
121  bool begin,
122  BranchType branchType,
123  ModuleDescription const& desc,
124  std::string const& id,
125  PathContext const& pathContext) {
126  std::ostringstream ost;
127  ost << "Running path '" << pathContext.pathName() << "'";
128  ex.addContext(ost.str());
129  ost.str("");
130  ost << "Processing ";
131  //For the event case, the Worker has already
132  // added the necessary module context to the exception
133  if (begin && branchType == InRun) {
134  ost << "stream begin Run";
135  } else if (begin && branchType == InLumi) {
136  ost << "stream begin LuminosityBlock ";
137  } else if (!begin && branchType == InLumi) {
138  ost << "stream end LuminosityBlock ";
139  } else if (!begin && branchType == InRun) {
140  ost << "stream end Run ";
141  } else if (isEvent) {
142  // It should be impossible to get here ...
143  ost << "Event ";
144  }
145  ost << id;
146  ex.addContext(ost.str());
147  }
148 
149  void Path::threadsafe_setFailedModuleInfo(int nwrwue, std::exception_ptr iExcept) {
150  bool expected = false;
151  while (stateLock_.compare_exchange_strong(expected, true)) {
152  expected = false;
153  }
154  if (iExcept) {
155  if (state_ == hlt::Exception) {
156  if (nwrwue < failedModuleIndex_) {
157  failedModuleIndex_ = nwrwue;
158  }
159  } else {
161  failedModuleIndex_ = nwrwue;
162  }
163  } else {
164  if (state_ != hlt::Exception) {
165  if (nwrwue < failedModuleIndex_) {
166  failedModuleIndex_ = nwrwue;
167  }
168  state_ = hlt::Fail;
169  }
170  }
171 
172  stateLock_ = false;
173  }
174 
176  if (trptr_) {
177  (*trptr_)[bitpos_] = HLTPathStatus(state, nwrwue);
178  }
179  }
180 
182  switch (state) {
183  case hlt::Pass: {
184  ++timesPassed_;
185  break;
186  }
187  case hlt::Fail: {
188  ++timesFailed_;
189  break;
190  }
191  case hlt::Exception: {
192  ++timesExcept_;
193  }
194  default:;
195  }
196  }
197 
199  using std::placeholders::_1;
201  for_all(workers_, std::bind(&WorkerInPath::clearCounters, _1));
202  }
203 
204  void Path::setEarlyDeleteHelpers(std::map<const Worker*, EarlyDeleteHelper*> const& iWorkerToDeleter) {
205  for (unsigned int index = 0; index != size(); ++index) {
206  auto found = iWorkerToDeleter.find(getWorker(index));
207  if (found != iWorkerToDeleter.end()) {
208  found->second->addedToPath();
209  }
210  }
211  }
212 
213  void Path::setPathStatusInserter(PathStatusInserter* pathStatusInserter, Worker* pathStatusInserterWorker) {
214  pathStatusInserter_ = pathStatusInserter;
215  pathStatusInserterWorker_ = pathStatusInserterWorker;
216  }
217 
219  EventTransitionInfo const& iInfo,
220  ServiceToken const& iToken,
221  StreamID const& iStreamID,
222  StreamContext const* iStreamContext) {
224  modulesToRun_ = workers_.size();
225  ++timesRun_;
226  waitingTasks_.add(iTask);
227  if (actReg_) {
228  ServiceRegistry::Operate guard(iToken);
229  actReg_->prePathEventSignal_(*iStreamContext, pathContext_);
230  }
231  //If the Path succeeds, these are the values we have at the end
232  state_ = hlt::Pass;
233  failedModuleIndex_ = workers_.size() - 1;
234 
235  if (workers_.empty()) {
236  ServiceRegistry::Operate guard(iToken);
237  finished(std::exception_ptr(), iStreamContext, iInfo, iStreamID);
238  return;
239  }
240 
241  runNextWorkerAsync(0, iInfo, iToken, iStreamID, iStreamContext, *iTask.group());
242  }
243 
244  void Path::workerFinished(std::exception_ptr const* iException,
245  unsigned int iModuleIndex,
246  EventTransitionInfo const& iInfo,
247  ServiceToken const& iToken,
248  StreamID const& iID,
249  StreamContext const* iContext,
250  tbb::task_group& iGroup) {
251  EventPrincipal const& iEP = iInfo.principal();
252  ServiceRegistry::Operate guard(iToken);
253 
254  //This call also allows the WorkerInPath to update statistics
255  // so should be done even if an exception happened
256  auto& worker = workers_[iModuleIndex];
257  bool shouldContinue = worker.checkResultsOfRunWorker(true);
258  std::exception_ptr finalException;
259  if (iException) {
260  std::unique_ptr<cms::Exception> pEx;
261  try {
262  std::rethrow_exception(*iException);
263  } catch (cms::Exception& oldEx) {
264  pEx = std::unique_ptr<cms::Exception>(oldEx.clone());
265  }
266  // Caught exception is propagated via WaitingTaskList
267  CMS_SA_ALLOW try {
268  std::ostringstream ost;
269  ost << iEP.id();
270  ModuleDescription const* desc = worker.getWorker()->description();
271  assert(desc != nullptr);
272  shouldContinue = handleWorkerFailure(*pEx,
273  iModuleIndex,
274  /*isEvent*/ true,
275  /*isBegin*/ true,
276  InEvent,
277  *desc,
278  ost.str());
279  //If we didn't rethrow, then we effectively skipped
280  worker.skipWorker(iEP);
281  finalException = std::exception_ptr();
282  } catch (...) {
283  shouldContinue = false;
284  finalException = std::current_exception();
285  //set the exception early to avoid case where another Path is waiting
286  // on a module in this Path and not running the module will lead to a
287  // different but related exception in the other Path. We want this
288  // Paths exception to be the one that gets reported.
289  waitingTasks_.presetTaskAsFailed(finalException);
290  }
291  }
293  shouldContinue = false;
294  }
295  auto const nextIndex = iModuleIndex + 1;
296  if (shouldContinue and nextIndex < workers_.size()) {
297  if (not worker.runConcurrently()) {
298  --modulesToRun_;
299  runNextWorkerAsync(nextIndex, iInfo, iToken, iID, iContext, iGroup);
300  return;
301  }
302  }
303 
304  if (not shouldContinue) {
305  threadsafe_setFailedModuleInfo(iModuleIndex, finalException);
306  }
307  if (not shouldContinue and not worker.runConcurrently()) {
308  //we are leaving the path early
309  for (auto it = workers_.begin() + nextIndex, itEnd = workers_.end(); it != itEnd; ++it) {
310  --modulesToRun_;
311  it->skipWorker(iEP);
312  }
313  }
314  if (--modulesToRun_ == 0) {
315  //The path should only be marked as finished once all outstanding modules finish
316  finished(finalException, iContext, iInfo, iID);
317  }
318  }
319 
320  void Path::finished(std::exception_ptr iException,
321  StreamContext const* iContext,
322  EventTransitionInfo const& iInfo,
323  StreamID const& streamID) {
326  // Caught exception is propagated via WaitingTaskList
327  CMS_SA_ALLOW try {
329 
330  if (pathStatusInserter_) { // pathStatusInserter is null for EndPaths
331  pathStatusInserter_->setPathStatus(streamID, status);
332  }
333  std::exception_ptr jException =
335  iInfo, streamID, ParentContext(iContext), iContext);
336  if (jException && not iException) {
337  iException = jException;
338  }
339  actReg_->postPathEventSignal_(*iContext, pathContext_, status);
340  } catch (...) {
341  if (not iException) {
342  iException = std::current_exception();
343  }
344  }
345  waitingTasks_.doneWaiting(iException);
346  }
347 
348  void Path::runNextWorkerAsync(unsigned int iNextModuleIndex,
349  EventTransitionInfo const& iInfo,
350  ServiceToken const& iToken,
351  StreamID const& iID,
352  StreamContext const* iContext,
353  tbb::task_group& iGroup) {
354  //Figure out which next modules can run concurrently
355  const int firstModuleIndex = iNextModuleIndex;
356  int lastModuleIndex = firstModuleIndex;
357  while (lastModuleIndex + 1 != static_cast<int>(workers_.size()) and workers_[lastModuleIndex].runConcurrently()) {
358  ++lastModuleIndex;
359  }
360  for (; lastModuleIndex >= firstModuleIndex; --lastModuleIndex) {
361  ServiceWeakToken weakToken = iToken;
362  auto nextTask = make_waiting_task([this, lastModuleIndex, info = iInfo, iID, iContext, weakToken, &iGroup](
363  std::exception_ptr const* iException) {
364  this->workerFinished(iException, lastModuleIndex, info, weakToken.lock(), iID, iContext, iGroup);
365  });
367  WaitingTaskHolder(iGroup, nextTask), iInfo, iToken, iID, iContext);
368  }
369  }
370 
371 } // namespace edm
void workerFinished(std::exception_ptr const *, unsigned int iModuleIndex, EventTransitionInfo const &, ServiceToken const &, StreamID const &, StreamContext const *, tbb::task_group &iGroup)
Definition: Path.cc:244
std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1146
std::string const & pathName() const
Definition: PathContext.h:30
TrigResPtr const trptr_
Definition: Path.h:108
ServiceToken lock() const
Definition: ServiceToken.h:101
const auto & workers
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
std::atomic< unsigned int > modulesToRun_
Definition: Path.h:118
void recordStatus(int nwrwue, hlt::HLTState state)
Definition: Path.cc:175
static const std::string & codeToString(Code)
-----------— implementation details ---------------—
Definition: EDMException.cc:52
uint16_t *__restrict__ id
int timesFailed_
Definition: Path.h:98
std::vector< WorkerInPath > WorkersInPath
Definition: Path.h:45
void threadsafe_setFailedModuleInfo(int nwrwue, std::exception_ptr)
Definition: Path.cc:149
EventPrincipal & principal()
HLTState
status of a trigger path
Definition: HLTenums.h:16
EventID const & id() const
reject
Definition: HLTenums.h:19
void raise()
Definition: Exception.h:101
list status
Definition: mps_update.py:107
void setPathStatus(StreamID const &, HLTPathStatus const &)
std::string const & category() const
Definition: Exception.cc:143
exception_actions::ActionCodes find(const std::string &category) const
int timesExcept_
Definition: Path.h:99
PathContext pathContext_
Definition: Path.h:115
size_type size() const
Definition: Path.h:84
WaitingTaskList waitingTasks_
Definition: Path.h:116
void reset()
Resets access to the resource so that added tasks will wait.
int const bitpos_
Definition: Path.h:107
assert(be >=bs)
actions
Definition: Schedule.cc:687
BranchType
Definition: BranchType.h:11
void setPathStatusInserter(PathStatusInserter *pathStatusInserter, Worker *pathStatusInserterWorker)
Definition: Path.cc:213
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
std::shared_ptr< HLTGlobalStatus > TrigResPtr
Definition: Path.h:47
accept
Definition: HLTenums.h:18
void finished(std::exception_ptr, StreamContext const *, EventTransitionInfo const &, StreamID const &)
Definition: Path.cc:320
Definition: Path.h:41
std::list< std::string > const & context() const
Definition: Exception.cc:147
void updateCounters(hlt::HLTState state)
Definition: Path.cc:181
std::atomic< bool > *const stopProcessingEvent_
Definition: Path.h:117
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:169
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
areg
Definition: Schedule.cc:687
void runNextWorkerAsync(unsigned int iNextModuleIndex, EventTransitionInfo const &, ServiceToken const &, StreamID const &, StreamContext const *, tbb::task_group &iGroup)
Definition: Path.cc:348
void add(tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
ExceptionToActionTable const *const act_table_
Definition: Path.h:111
WorkersInPath workers_
Definition: Path.h:113
static void exceptionContext(cms::Exception &ex, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id, PathContext const &)
Definition: Path.cc:119
void presetTaskAsFailed(std::exception_ptr iExcept)
Path(int bitpos, std::string const &path_name, WorkersInPath const &workers, TrigResPtr trptr, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > reg, StreamContext const *streamContext, std::atomic< bool > *stopProcessEvent, PathContext::PathType pathType)
Definition: Path.cc:18
int failedModuleIndex_
Definition: Path.h:104
void addContext(std::string const &context)
Definition: Exception.cc:165
std::shared_ptr< ActivityRegistry > const actReg_
Definition: Path.h:110
void setEarlyDeleteHelpers(std::map< const Worker *, EarlyDeleteHelper * > const &)
Definition: Path.cc:204
string action
Definition: mps_fire.py:183
tbb::task_group * group() const noexcept
State state_
Definition: Path.h:105
Worker const * getWorker(size_type i) const
Definition: Path.h:89
bool handleWorkerFailure(cms::Exception &e, int nwrwue, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id) const
Definition: Path.cc:70
std::atomic< bool > stateLock_
Definition: Path.h:103
tuple size
Write out results.
int timesPassed_
Definition: Path.h:97
int timesRun_
Definition: Path.h:96
Worker * pathStatusInserterWorker_
Definition: Path.h:121
void clearCounters()
Definition: Path.cc:198
void processOneOccurrenceAsync(WaitingTaskHolder, EventTransitionInfo const &, ServiceToken const &, StreamID const &, StreamContext const *)
Definition: Path.cc:218
virtual Exception * clone() const
Definition: Exception.cc:181
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
PathStatusInserter * pathStatusInserter_
Definition: Path.h:120