CMS 3D CMS Logo

Path.cc
Go to the documentation of this file.
1 
14 
15 #include <algorithm>
16 
17 namespace edm {
18  Path::Path(int bitpos,
19  std::string const& path_name,
20  WorkersInPath const& workers,
21  TrigResPtr trptr,
23  std::shared_ptr<ActivityRegistry> areg,
24  StreamContext const* streamContext,
25  std::atomic<bool>* stopProcessingEvent,
26  PathContext::PathType pathType)
27  : timesRun_(),
28  timesPassed_(),
29  timesFailed_(),
30  timesExcept_(),
31  failedModuleIndex_(workers.size()),
32  state_(hlt::Ready),
33  bitpos_(bitpos),
34  trptr_(trptr),
35  actReg_(areg),
36  act_table_(&actions),
37  workers_(workers),
38  pathContext_(path_name, streamContext, bitpos, pathType),
39  stopProcessingEvent_(stopProcessingEvent),
40  pathStatusInserter_(nullptr),
41  pathStatusInserterWorker_(nullptr) {
42  for (auto& workerInPath : workers_) {
43  workerInPath.setPathContext(&pathContext_);
44  }
45  modulesToRun_ = workers_.size();
46  }
47 
48  Path::Path(Path const& r)
49  : timesRun_(r.timesRun_),
50  timesPassed_(r.timesPassed_),
51  timesFailed_(r.timesFailed_),
52  timesExcept_(r.timesExcept_),
53  failedModuleIndex_(r.failedModuleIndex_),
54  state_(r.state_),
55  bitpos_(r.bitpos_),
56  trptr_(r.trptr_),
57  actReg_(r.actReg_),
58  act_table_(r.act_table_),
59  workers_(r.workers_),
60  pathContext_(r.pathContext_),
61  stopProcessingEvent_(r.stopProcessingEvent_),
62  pathStatusInserter_(r.pathStatusInserter_),
63  pathStatusInserterWorker_(r.pathStatusInserterWorker_) {
64  for (auto& workerInPath : workers_) {
65  workerInPath.setPathContext(&pathContext_);
66  }
67  modulesToRun_ = workers_.size();
68  }
69 
71  int nwrwue,
72  bool isEvent,
73  bool begin,
74  BranchType branchType,
75  ModuleDescription const& desc,
76  std::string const& id) const {
77  if (e.context().empty()) {
78  exceptionContext(e, isEvent, begin, branchType, desc, id, pathContext_);
79  }
80  bool should_continue = true;
81 
82  // there is no support as of yet for specific paths having
83  // different exception behavior
84 
85  // If not processing an event, always rethrow.
87  switch (action) {
89  should_continue = false;
90  edm::printCmsExceptionWarning("FailPath", e);
91  break;
92  }
94  //Need the other Paths to stop as soon as possible
96  *stopProcessingEvent_ = true;
97  }
98  break;
99  }
100  default: {
103  if (e.category() == pNF) {
104  std::ostringstream ost;
105  ost << "If you wish to continue processing events after a " << pNF << " exception,\n"
106  << "add \"SkipEvent = cms.untracked.vstring('ProductNotFound')\" to the \"options\" PSet in the "
107  "configuration.\n";
108  e.addAdditionalInfo(ost.str());
109  }
110  }
111  //throw will copy which will slice the object
112  e.raise();
113  }
114  }
115 
116  return should_continue;
117  }
118 
120  bool isEvent,
121  bool begin,
122  BranchType branchType,
123  ModuleDescription const& desc,
124  std::string const& id,
125  PathContext const& pathContext) {
126  std::ostringstream ost;
127  ost << "Running path '" << pathContext.pathName() << "'";
128  ex.addContext(ost.str());
129  ost.str("");
130  ost << "Processing ";
131  //For the event case, the Worker has already
132  // added the necessary module context to the exception
133  if (begin && branchType == InRun) {
134  ost << "stream begin Run";
135  } else if (begin && branchType == InLumi) {
136  ost << "stream begin LuminosityBlock ";
137  } else if (!begin && branchType == InLumi) {
138  ost << "stream end LuminosityBlock ";
139  } else if (!begin && branchType == InRun) {
140  ost << "stream end Run ";
141  } else if (isEvent) {
142  // It should be impossible to get here ...
143  ost << "Event ";
144  }
145  ost << id;
146  ex.addContext(ost.str());
147  }
148 
149  void Path::threadsafe_setFailedModuleInfo(int nwrwue, std::exception_ptr iExcept) {
150  bool expected = false;
151  while (stateLock_.compare_exchange_strong(expected, true)) {
152  expected = false;
153  }
154  if (iExcept) {
155  if (state_ == hlt::Exception) {
156  if (nwrwue < failedModuleIndex_) {
157  failedModuleIndex_ = nwrwue;
158  }
159  } else {
161  failedModuleIndex_ = nwrwue;
162  }
163  } else {
164  if (state_ != hlt::Exception) {
165  if (nwrwue < failedModuleIndex_) {
166  failedModuleIndex_ = nwrwue;
167  }
168  state_ = hlt::Fail;
169  }
170  }
171 
172  stateLock_ = false;
173  }
174 
176  if (trptr_) {
177  (*trptr_)[bitpos_] = HLTPathStatus(state, nwrwue);
178  }
179  }
180 
182  switch (state) {
183  case hlt::Pass: {
184  ++timesPassed_;
185  break;
186  }
187  case hlt::Fail: {
188  ++timesFailed_;
189  break;
190  }
191  case hlt::Exception: {
192  ++timesExcept_;
193  }
194  default:;
195  }
196  }
197 
199  using std::placeholders::_1;
201  for_all(workers_, std::bind(&WorkerInPath::clearCounters, _1));
202  }
203 
204  void Path::setEarlyDeleteHelpers(std::map<const Worker*, EarlyDeleteHelper*> const& iWorkerToDeleter) {
205  for (unsigned int index = 0; index != size(); ++index) {
206  auto found = iWorkerToDeleter.find(getWorker(index));
207  if (found != iWorkerToDeleter.end()) {
208  found->second->addedToPath();
209  }
210  }
211  }
212 
213  void Path::setPathStatusInserter(PathStatusInserter* pathStatusInserter, Worker* pathStatusInserterWorker) {
214  pathStatusInserter_ = pathStatusInserter;
215  pathStatusInserterWorker_ = pathStatusInserterWorker;
216  }
217 
219  EventTransitionInfo const& iInfo,
220  ServiceToken const& iToken,
221  StreamID const& iStreamID,
222  StreamContext const* iStreamContext) {
224  modulesToRun_ = workers_.size();
225  ++timesRun_;
226  waitingTasks_.add(iTask);
227  if (actReg_) {
228  ServiceRegistry::Operate guard(iToken);
229  actReg_->prePathEventSignal_(*iStreamContext, pathContext_);
230  }
231  //If the Path succeeds, these are the values we have at the end
232  state_ = hlt::Pass;
233  failedModuleIndex_ = workers_.size() - 1;
234 
235  if (workers_.empty()) {
236  ServiceRegistry::Operate guard(iToken);
237  finished(std::exception_ptr(), iStreamContext, iInfo, iStreamID);
238  return;
239  }
240 
241  runNextWorkerAsync(0, iInfo, iToken, iStreamID, iStreamContext, *iTask.group());
242  }
243 
244  void Path::workerFinished(std::exception_ptr const* iException,
245  unsigned int iModuleIndex,
246  EventTransitionInfo const& iInfo,
247  ServiceToken const& iToken,
248  StreamID const& iID,
249  StreamContext const* iContext,
250  tbb::task_group& iGroup) {
251  EventPrincipal const& iEP = iInfo.principal();
252  ServiceRegistry::Operate guard(iToken);
253 
254  //This call also allows the WorkerInPath to update statistics
255  // so should be done even if an exception happened
256  auto& worker = workers_[iModuleIndex];
257  bool shouldContinue = worker.checkResultsOfRunWorker(true);
258  std::exception_ptr finalException;
259  if (iException) {
260  std::unique_ptr<cms::Exception> pEx;
261  try {
262  std::rethrow_exception(*iException);
263  } catch (cms::Exception& oldEx) {
264  pEx = std::unique_ptr<cms::Exception>(oldEx.clone());
265  }
266  // Caught exception is propagated via WaitingTaskList
267  CMS_SA_ALLOW try {
268  std::ostringstream ost;
269  ost << iEP.id();
270  ModuleDescription const* desc = worker.getWorker()->description();
271  assert(desc != nullptr);
272  shouldContinue = handleWorkerFailure(*pEx,
273  iModuleIndex,
274  /*isEvent*/ true,
275  /*isBegin*/ true,
276  InEvent,
277  *desc,
278  ost.str());
279  //If we didn't rethrow, then we effectively skipped
280  worker.skipWorker(iEP);
281  finalException = std::exception_ptr();
282  } catch (...) {
283  shouldContinue = false;
284  finalException = std::current_exception();
285  //set the exception early to avoid case where another Path is waiting
286  // on a module in this Path and not running the module will lead to a
287  // different but related exception in the other Path. We want this
288  // Paths exception to be the one that gets reported.
289  waitingTasks_.presetTaskAsFailed(finalException);
290  }
291  }
293  shouldContinue = false;
294  }
295  auto const nextIndex = iModuleIndex + 1;
296  if (shouldContinue and nextIndex < workers_.size()) {
297  if (not worker.runConcurrently()) {
298  --modulesToRun_;
299  runNextWorkerAsync(nextIndex, iInfo, iToken, iID, iContext, iGroup);
300  return;
301  }
302  }
303 
304  if (not shouldContinue) {
305  threadsafe_setFailedModuleInfo(iModuleIndex, finalException);
306  }
307  if (not shouldContinue and not worker.runConcurrently()) {
308  //we are leaving the path early
309  for (auto it = workers_.begin() + nextIndex, itEnd = workers_.end(); it != itEnd; ++it) {
310  --modulesToRun_;
311  it->skipWorker(iEP);
312  }
313  }
314  if (--modulesToRun_ == 0) {
315  //The path should only be marked as finished once all outstanding modules finish
316  finished(finalException, iContext, iInfo, iID);
317  }
318  }
319 
320  void Path::finished(std::exception_ptr iException,
321  StreamContext const* iContext,
322  EventTransitionInfo const& iInfo,
323  StreamID const& streamID) {
326  // Caught exception is propagated via WaitingTaskList
327  CMS_SA_ALLOW try {
329 
330  if (pathStatusInserter_) { // pathStatusInserter is null for EndPaths
332  }
333  std::exception_ptr jException =
335  iInfo, streamID, ParentContext(iContext), iContext);
336  if (jException && not iException) {
337  iException = jException;
338  }
339  actReg_->postPathEventSignal_(*iContext, pathContext_, status);
340  } catch (...) {
341  if (not iException) {
342  iException = std::current_exception();
343  }
344  }
345  waitingTasks_.doneWaiting(iException);
346  }
347 
348  void Path::runNextWorkerAsync(unsigned int iNextModuleIndex,
349  EventTransitionInfo const& iInfo,
350  ServiceToken const& iToken,
351  StreamID const& iID,
352  StreamContext const* iContext,
353  tbb::task_group& iGroup) {
354  //Figure out which next modules can run concurrently
355  const int firstModuleIndex = iNextModuleIndex;
356  int lastModuleIndex = firstModuleIndex;
357  while (lastModuleIndex + 1 != static_cast<int>(workers_.size()) and workers_[lastModuleIndex].runConcurrently()) {
358  ++lastModuleIndex;
359  }
360  for (; lastModuleIndex >= firstModuleIndex; --lastModuleIndex) {
361  ServiceWeakToken weakToken = iToken;
362  auto nextTask = make_waiting_task([this, lastModuleIndex, info = iInfo, iID, iContext, weakToken, &iGroup](
363  std::exception_ptr const* iException) {
364  this->workerFinished(iException, lastModuleIndex, info, weakToken.lock(), iID, iContext, iGroup);
365  });
367  WaitingTaskHolder(iGroup, nextTask), iInfo, iToken, iID, iContext);
368  }
369  }
370 
371 } // namespace edm
edm::EventTransitionInfo
Definition: TransitionInfoTypes.h:26
edm::hlt::HLTState
HLTState
status of a trigger path
Definition: HLTenums.h:16
edm::Path::pathStatusInserterWorker_
Worker * pathStatusInserterWorker_
Definition: Path.h:121
edm::StreamID
Definition: StreamID.h:30
edm::Path::pathStatusInserter_
PathStatusInserter * pathStatusInserter_
Definition: Path.h:120
edm::Path::getWorker
Worker const * getWorker(size_type i) const
Definition: Path.h:89
MessageLogger.h
cms::Exception::addContext
void addContext(std::string const &context)
Definition: Exception.cc:165
edm::ServiceWeakToken
Definition: ServiceToken.h:86
edm::Path::stateLock_
std::atomic< bool > stateLock_
Definition: Path.h:103
edm::Path::threadsafe_setFailedModuleInfo
void threadsafe_setFailedModuleInfo(int nwrwue, std::exception_ptr)
Definition: Path.cc:149
edm::exception_actions::Rethrow
Definition: ExceptionActions.h:11
edm::Path::waitingTasks_
WaitingTaskList waitingTasks_
Definition: Path.h:116
Path.h
mps_update.status
status
Definition: mps_update.py:68
WaitingTaskHolder.h
edm::Path::modulesToRun_
std::atomic< unsigned int > modulesToRun_
Definition: Path.h:118
edm
HLT enums.
Definition: AlignableModifier.h:19
edm::Worker::runModuleDirectly
std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const &, StreamID, ParentContext const &, typename T::Context const *)
Definition: Worker.h:1148
edm::printCmsExceptionWarning
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
Definition: ExceptionMessages.cc:25
edm::Path::workerFinished
void workerFinished(std::exception_ptr const *, unsigned int iModuleIndex, EventTransitionInfo const &, ServiceToken const &, StreamID const &, StreamContext const *, tbb::task_group &iGroup)
Definition: Path.cc:244
edm::Path::WorkersInPath
std::vector< WorkerInPath > WorkersInPath
Definition: Path.h:45
Algorithms.h
cms::cuda::assert
assert(be >=bs)
info
static const TGPicture * info(bool iBackgroundIsBlack)
Definition: FWCollectionSummaryWidget.cc:153
edm::Path::bitpos_
const int bitpos_
Definition: Path.h:107
edm::Path::TrigResPtr
std::shared_ptr< HLTGlobalStatus > TrigResPtr
Definition: Path.h:47
newFWLiteAna.found
found
Definition: newFWLiteAna.py:118
edm::EventPrincipal::id
EventID const & id() const
Definition: EventPrincipal.h:96
edm::InRun
Definition: BranchType.h:11
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
edm::Path::timesFailed_
int timesFailed_
Definition: Path.h:98
edm::BranchType
BranchType
Definition: BranchType.h:11
edm::ModuleDescription
Definition: ModuleDescription.h:21
edm::Path::timesExcept_
int timesExcept_
Definition: Path.h:99
edm::WaitingTaskList::reset
void reset()
Resets access to the resource so that added tasks will wait.
Definition: WaitingTaskList.cc:53
edm::for_all
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::WaitingTaskList::presetTaskAsFailed
void presetTaskAsFailed(std::exception_ptr iExcept)
Definition: WaitingTaskList.cc:163
edm::errors::ProductNotFound
Definition: EDMException.h:33
edm::ServiceToken
Definition: ServiceToken.h:42
edm::Path::stopProcessingEvent_
std::atomic< bool > *const stopProcessingEvent_
Definition: Path.h:117
edm::Path::updateCounters
void updateCounters(hlt::HLTState state)
Definition: Path.cc:181
edm::EventPrincipal
Definition: EventPrincipal.h:48
edm::StreamContext
Definition: StreamContext.h:31
EarlyDeleteHelper.h
edm::Path::setPathStatusInserter
void setPathStatusInserter(PathStatusInserter *pathStatusInserter, Worker *pathStatusInserterWorker)
Definition: Path.cc:213
EventPrincipal.h
edm::Path::pathContext_
PathContext pathContext_
Definition: Path.h:115
edm::PathContext::PathType
PathType
Definition: PathContext.h:26
edm::exception_actions::FailPath
Definition: ExceptionActions.h:11
edm::OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >
Definition: OccurrenceTraits.h:38
edm::WaitingTaskList::doneWaiting
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
Definition: WaitingTaskList.cc:212
edm::InEvent
Definition: BranchType.h:11
edm::exception_actions::SkipEvent
Definition: ExceptionActions.h:11
edm::Worker
Definition: Worker.h:91
edm::Path::trptr_
const TrigResPtr trptr_
Definition: Path.h:108
edm::ParentContext
Definition: ParentContext.h:27
edm::PathStatusInserter::setPathStatus
void setPathStatus(StreamID const &, HLTPathStatus const &)
Definition: PathStatusInserter.cc:12
edm::Path::finished
void finished(std::exception_ptr, StreamContext const *, EventTransitionInfo const &, StreamID const &)
Definition: Path.cc:320
edm::hlt::Fail
reject
Definition: HLTenums.h:19
edm::Path::Path
Path(int bitpos, std::string const &path_name, WorkersInPath const &workers, TrigResPtr trptr, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > reg, StreamContext const *streamContext, std::atomic< bool > *stopProcessEvent, PathContext::PathType pathType)
Definition: Path.cc:18
edm::Path::size
size_type size() const
Definition: Path.h:84
edm::make_waiting_task
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:101
edm::WaitingTaskHolder
Definition: WaitingTaskHolder.h:32
edm::InLumi
Definition: BranchType.h:11
edm::Path::runNextWorkerAsync
void runNextWorkerAsync(unsigned int iNextModuleIndex, EventTransitionInfo const &, ServiceToken const &, StreamID const &, StreamContext const *, tbb::task_group &iGroup)
Definition: Path.cc:348
Ready
Definition: hltDiff.cc:242
edm::PathContext
Definition: PathContext.h:24
edm::Path::act_table_
ExceptionToActionTable const *const act_table_
Definition: Path.h:111
edm::Path::workers_
WorkersInPath workers_
Definition: Path.h:113
ExceptionMessages.h
edm::PathStatusInserter
Definition: PathStatusInserter.h:15
edm::Path
Definition: Path.h:41
edm::HLTPathStatus
Definition: HLTPathStatus.h:33
edm::Path::exceptionContext
static void exceptionContext(cms::Exception &ex, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id, PathContext const &)
Definition: Path.cc:119
edm::Path::actReg_
const std::shared_ptr< ActivityRegistry > actReg_
Definition: Path.h:110
writedatasetfile.action
action
Definition: writedatasetfile.py:8
ExceptionActions.h
edm::Path::failedModuleIndex_
int failedModuleIndex_
Definition: Path.h:104
edm::ServiceWeakToken::lock
ServiceToken lock() const
Definition: ServiceToken.h:101
edm::ExceptionToActionTable
Definition: ExceptionActions.h:16
PathStatusInserter.h
edm::PathContext::pathName
std::string const & pathName() const
Definition: PathContext.h:30
AlCaHLTBitMon_QueryRunRegistry.string
string string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
edm::Path::setEarlyDeleteHelpers
void setEarlyDeleteHelpers(std::map< const Worker *, EarlyDeleteHelper * > const &)
Definition: Path.cc:204
alignCSCRings.r
r
Definition: alignCSCRings.py:93
edm::WorkerInPath::clearCounters
void clearCounters()
Definition: WorkerInPath.h:44
OccurrenceTraits.h
edm::WaitingTaskList::add
void add(tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
Definition: WaitingTaskList.cc:125
ParentContext.h
submitPVResolutionJobs.desc
string desc
Definition: submitPVResolutionJobs.py:251
edm::Exception::codeToString
static const std::string & codeToString(Code)
-----------— implementation details ---------------—
Definition: EDMException.cc:52
RunInfoPI::state
state
Definition: RunInfoPayloadInspectoHelper.h:16
triggerObjects_cff.id
id
Definition: triggerObjects_cff.py:29
edm::Path::processOneOccurrenceAsync
void processOneOccurrenceAsync(WaitingTaskHolder, EventTransitionInfo const &, ServiceToken const &, StreamID const &, StreamContext const *)
Definition: Path.cc:218
cms::Exception::clone
virtual Exception * clone() const
Definition: Exception.cc:181
edm::ExceptionToActionTable::find
exception_actions::ActionCodes find(const std::string &category) const
Definition: ExceptionActions.cc:85
AlignmentPI::index
index
Definition: AlignmentPayloadInspectorHelper.h:46
actions
roAction_t actions[nactions]
Definition: GenABIO.cc:181
cms::Exception
Definition: Exception.h:70
TransitionInfoTypes.h
edm::Path::handleWorkerFailure
bool handleWorkerFailure(cms::Exception &e, int nwrwue, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id) const
Definition: Path.cc:70
edm::EventTransitionInfo::principal
EventPrincipal & principal()
Definition: TransitionInfoTypes.h:33
edm::hlt::Exception
error
Definition: HLTenums.h:20
edm::Path::timesRun_
int timesRun_
Definition: Path.h:96
edm::hlt::Pass
accept
Definition: HLTenums.h:18
edm::Path::clearCounters
void clearCounters()
Definition: Path.cc:198
edm::Path::recordStatus
void recordStatus(int nwrwue, hlt::HLTState state)
Definition: Path.cc:175
edm::ServiceRegistry::Operate
Definition: ServiceRegistry.h:40
edm::Path::state_
State state_
Definition: Path.h:105
EcalCalibMonitorClient_cfi.workers
workers
Definition: EcalCalibMonitorClient_cfi.py:19
edm::WaitingTaskHolder::group
tbb::task_group * group() const noexcept
Definition: WaitingTaskHolder.h:77
edm::exception_actions::ActionCodes
ActionCodes
Definition: ExceptionActions.h:11
findQualityFiles.size
size
Write out results.
Definition: findQualityFiles.py:443
ValidationMatrix.hlt
hlt
Definition: ValidationMatrix.py:459
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37
edm::Path::timesPassed_
int timesPassed_
Definition: Path.h:97