CMS 3D CMS Logo

All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
Path.cc
Go to the documentation of this file.
1 
12 
13 #include <algorithm>
14 
15 namespace edm {
16  Path::Path(int bitpos,
17  std::string const& path_name,
18  WorkersInPath const& workers,
19  TrigResPtr trptr,
21  std::shared_ptr<ActivityRegistry> areg,
22  StreamContext const* streamContext,
23  std::atomic<bool>* stopProcessingEvent,
24  PathContext::PathType pathType)
25  : timesRun_(),
26  timesPassed_(),
27  timesFailed_(),
28  timesExcept_(),
29  failedModuleIndex_(workers.size()),
30  state_(hlt::Ready),
31  bitpos_(bitpos),
32  trptr_(trptr),
33  actReg_(areg),
34  act_table_(&actions),
35  workers_(workers),
36  pathContext_(path_name, streamContext, bitpos, pathType),
37  stopProcessingEvent_(stopProcessingEvent),
38  pathStatusInserter_(nullptr),
39  pathStatusInserterWorker_(nullptr) {
40  for (auto& workerInPath : workers_) {
41  workerInPath.setPathContext(&pathContext_);
42  }
43  modulesToRun_ = workers_.size();
44  }
45 
46  Path::Path(Path const& r)
47  : timesRun_(r.timesRun_),
48  timesPassed_(r.timesPassed_),
49  timesFailed_(r.timesFailed_),
50  timesExcept_(r.timesExcept_),
51  failedModuleIndex_(r.failedModuleIndex_),
52  state_(r.state_),
53  bitpos_(r.bitpos_),
54  trptr_(r.trptr_),
55  actReg_(r.actReg_),
56  act_table_(r.act_table_),
57  workers_(r.workers_),
58  pathContext_(r.pathContext_),
59  stopProcessingEvent_(r.stopProcessingEvent_),
60  pathStatusInserter_(r.pathStatusInserter_),
61  pathStatusInserterWorker_(r.pathStatusInserterWorker_) {
62  for (auto& workerInPath : workers_) {
63  workerInPath.setPathContext(&pathContext_);
64  }
65  modulesToRun_ = workers_.size();
66  }
67 
69  int nwrwue,
70  bool isEvent,
71  bool begin,
72  BranchType branchType,
73  ModuleDescription const& desc,
74  std::string const& id) const {
75  if (e.context().empty()) {
76  exceptionContext(e, isEvent, begin, branchType, desc, id, pathContext_);
77  }
78  bool should_continue = true;
79 
80  // there is no support as of yet for specific paths having
81  // different exception behavior
82 
83  // If not processing an event, always rethrow.
85  switch (action) {
87  should_continue = false;
88  edm::printCmsExceptionWarning("FailPath", e);
89  break;
90  }
92  //Need the other Paths to stop as soon as possible
94  *stopProcessingEvent_ = true;
95  }
96  break;
97  }
98  default: {
101  if (e.category() == pNF) {
102  std::ostringstream ost;
103  ost << "If you wish to continue processing events after a " << pNF << " exception,\n"
104  << "add \"SkipEvent = cms.untracked.vstring('ProductNotFound')\" to the \"options\" PSet in the "
105  "configuration.\n";
106  e.addAdditionalInfo(ost.str());
107  }
108  }
109  //throw will copy which will slice the object
110  e.raise();
111  }
112  }
113 
114  return should_continue;
115  }
116 
118  bool isEvent,
119  bool begin,
120  BranchType branchType,
121  ModuleDescription const& desc,
122  std::string const& id,
123  PathContext const& pathContext) {
124  std::ostringstream ost;
125  ost << "Running path '" << pathContext.pathName() << "'";
126  ex.addContext(ost.str());
127  ost.str("");
128  ost << "Processing ";
129  //For the event case, the Worker has already
130  // added the necessary module context to the exception
131  if (begin && branchType == InRun) {
132  ost << "stream begin Run";
133  } else if (begin && branchType == InLumi) {
134  ost << "stream begin LuminosityBlock ";
135  } else if (!begin && branchType == InLumi) {
136  ost << "stream end LuminosityBlock ";
137  } else if (!begin && branchType == InRun) {
138  ost << "stream end Run ";
139  } else if (isEvent) {
140  // It should be impossible to get here ...
141  ost << "Event ";
142  }
143  ost << id;
144  ex.addContext(ost.str());
145  }
146 
147  void Path::threadsafe_setFailedModuleInfo(int nwrwue, std::exception_ptr iExcept) {
148  bool expected = false;
149  while (stateLock_.compare_exchange_strong(expected, true)) {
150  expected = false;
151  }
152  if (iExcept) {
153  if (state_ == hlt::Exception) {
154  if (nwrwue < failedModuleIndex_) {
155  failedModuleIndex_ = nwrwue;
156  }
157  } else {
159  failedModuleIndex_ = nwrwue;
160  }
161  } else {
162  if (state_ != hlt::Exception) {
163  if (nwrwue < failedModuleIndex_) {
164  failedModuleIndex_ = nwrwue;
165  }
166  state_ = hlt::Fail;
167  }
168  }
169 
170  stateLock_ = false;
171  }
172 
173  void Path::recordStatus(int nwrwue, hlt::HLTState state) {
174  if (trptr_) {
175  (*trptr_)[bitpos_] = HLTPathStatus(state, nwrwue);
176  }
177  }
178 
180  switch (state) {
181  case hlt::Pass: {
182  ++timesPassed_;
183  break;
184  }
185  case hlt::Fail: {
186  ++timesFailed_;
187  break;
188  }
189  case hlt::Exception: {
190  ++timesExcept_;
191  }
192  default:;
193  }
194  }
195 
197  using std::placeholders::_1;
199  for_all(workers_, std::bind(&WorkerInPath::clearCounters, _1));
200  }
201 
202  void Path::setEarlyDeleteHelpers(std::map<const Worker*, EarlyDeleteHelper*> const& iWorkerToDeleter) {
203  for (unsigned int index = 0; index != size(); ++index) {
204  auto found = iWorkerToDeleter.find(getWorker(index));
205  if (found != iWorkerToDeleter.end()) {
206  found->second->addedToPath();
207  }
208  }
209  }
210 
211  void Path::setPathStatusInserter(PathStatusInserter* pathStatusInserter, Worker* pathStatusInserterWorker) {
212  pathStatusInserter_ = pathStatusInserter;
213  pathStatusInserterWorker_ = pathStatusInserterWorker;
214  }
215 
217  EventPrincipal const& iEP,
218  EventSetupImpl const& iES,
219  ServiceToken const& iToken,
220  StreamID const& iStreamID,
221  StreamContext const* iStreamContext) {
223  modulesToRun_ = workers_.size();
224  ++timesRun_;
225  waitingTasks_.add(iTask);
226  if (actReg_) {
227  ServiceRegistry::Operate guard(iToken);
228  actReg_->prePathEventSignal_(*iStreamContext, pathContext_);
229  }
230  //If the Path succeeds, these are the values we have at the end
231  state_ = hlt::Pass;
232  failedModuleIndex_ = workers_.size() - 1;
233 
234  if (workers_.empty()) {
235  ServiceRegistry::Operate guard(iToken);
236  finished(std::exception_ptr(), iStreamContext, iEP, iES, iStreamID);
237  return;
238  }
239 
240  runNextWorkerAsync(0, iEP, iES, iToken, iStreamID, iStreamContext);
241  }
242 
243  void Path::workerFinished(std::exception_ptr const* iException,
244  unsigned int iModuleIndex,
245  EventPrincipal const& iEP,
246  EventSetupImpl const& iES,
247  ServiceToken const& iToken,
248  StreamID const& iID,
249  StreamContext const* iContext) {
250  ServiceRegistry::Operate guard(iToken);
251 
252  //This call also allows the WorkerInPath to update statistics
253  // so should be done even if an exception happened
254  auto& worker = workers_[iModuleIndex];
255  bool shouldContinue = worker.checkResultsOfRunWorker(true);
256  std::exception_ptr finalException;
257  if (iException) {
258  std::unique_ptr<cms::Exception> pEx;
259  try {
260  std::rethrow_exception(*iException);
261  } catch (cms::Exception& oldEx) {
262  pEx = std::unique_ptr<cms::Exception>(oldEx.clone());
263  }
264  // Caught exception is propagated via WaitingTaskList
265  CMS_SA_ALLOW try {
266  std::ostringstream ost;
267  ost << iEP.id();
268  shouldContinue = handleWorkerFailure(*pEx,
269  iModuleIndex,
270  /*isEvent*/ true,
271  /*isBegin*/ true,
272  InEvent,
273  worker.getWorker()->description(),
274  ost.str());
275  //If we didn't rethrow, then we effectively skipped
276  worker.skipWorker(iEP);
277  finalException = std::exception_ptr();
278  } catch (...) {
279  shouldContinue = false;
280  finalException = std::current_exception();
281  //set the exception early to avoid case where another Path is waiting
282  // on a module in this Path and not running the module will lead to a
283  // different but related exception in the other Path. We want this
284  // Paths exception to be the one that gets reported.
285  waitingTasks_.presetTaskAsFailed(finalException);
286  }
287  }
289  shouldContinue = false;
290  }
291  auto const nextIndex = iModuleIndex + 1;
292  if (shouldContinue and nextIndex < workers_.size()) {
293  if (not worker.runConcurrently()) {
294  --modulesToRun_;
295  runNextWorkerAsync(nextIndex, iEP, iES, iToken, iID, iContext);
296  return;
297  }
298  }
299 
300  if (not shouldContinue) {
301  threadsafe_setFailedModuleInfo(iModuleIndex, finalException);
302  }
303  if (not shouldContinue and not worker.runConcurrently()) {
304  //we are leaving the path early
305  for (auto it = workers_.begin() + nextIndex, itEnd = workers_.end(); it != itEnd; ++it) {
306  --modulesToRun_;
307  it->skipWorker(iEP);
308  }
309  }
310  if (--modulesToRun_ == 0) {
311  //The path should only be marked as finished once all outstanding modules finish
312  finished(finalException, iContext, iEP, iES, iID);
313  }
314  }
315 
316  void Path::finished(std::exception_ptr iException,
317  StreamContext const* iContext,
318  EventPrincipal const& iEP,
319  EventSetupImpl const& iES,
320  StreamID const& streamID) {
323  // Caught exception is propagated via WaitingTaskList
324  CMS_SA_ALLOW try {
326 
327  if (pathStatusInserter_) { // pathStatusInserter is null for EndPaths
329  }
330  std::exception_ptr jException =
332  iEP, iES, streamID, ParentContext(iContext), iContext);
333  if (jException && not iException) {
334  iException = jException;
335  }
336  actReg_->postPathEventSignal_(*iContext, pathContext_, status);
337  } catch (...) {
338  if (not iException) {
339  iException = std::current_exception();
340  }
341  }
342  waitingTasks_.doneWaiting(iException);
343  }
344 
345  void Path::runNextWorkerAsync(unsigned int iNextModuleIndex,
346  EventPrincipal const& iEP,
347  EventSetupImpl const& iES,
348  ServiceToken const& iToken,
349  StreamID const& iID,
350  StreamContext const* iContext) {
351  //Figure out which next modules can run concurrently
352  const int firstModuleIndex = iNextModuleIndex;
353  int lastModuleIndex = firstModuleIndex;
354  while (lastModuleIndex + 1 != static_cast<int>(workers_.size()) and workers_[lastModuleIndex].runConcurrently()) {
355  ++lastModuleIndex;
356  }
357  for (; lastModuleIndex >= firstModuleIndex; --lastModuleIndex) {
358  auto nextTask = make_waiting_task(
359  tbb::task::allocate_root(),
360  [this, lastModuleIndex, &iEP, &iES, iID, iContext, token = iToken](std::exception_ptr const* iException) {
361  this->workerFinished(iException, lastModuleIndex, iEP, iES, token, iID, iContext);
362  });
364  nextTask, iEP, iES, iToken, iID, iContext);
365  }
366  }
367 
368 } // namespace edm
edm::hlt::HLTState
HLTState
status of a trigger path
Definition: HLTenums.h:16
edm::Path::pathStatusInserterWorker_
Worker * pathStatusInserterWorker_
Definition: Path.h:131
edm::StreamID
Definition: StreamID.h:30
edm::Path::pathStatusInserter_
PathStatusInserter * pathStatusInserter_
Definition: Path.h:130
edm::Path::getWorker
Worker const * getWorker(size_type i) const
Definition: Path.h:95
MessageLogger.h
cms::Exception::addContext
void addContext(std::string const &context)
Definition: Exception.cc:165
edm::Path::stateLock_
std::atomic< bool > stateLock_
Definition: Path.h:113
edm::Path::threadsafe_setFailedModuleInfo
void threadsafe_setFailedModuleInfo(int nwrwue, std::exception_ptr)
Definition: Path.cc:147
edm::exception_actions::Rethrow
Definition: ExceptionActions.h:11
edm::Path::waitingTasks_
WaitingTaskList waitingTasks_
Definition: Path.h:126
edm::EventSetupImpl
Definition: EventSetupImpl.h:44
Path.h
mps_update.status
status
Definition: mps_update.py:69
WaitingTaskHolder.h
edm::Path::modulesToRun_
std::atomic< unsigned int > modulesToRun_
Definition: Path.h:128
edm
HLT enums.
Definition: AlignableModifier.h:19
edm::printCmsExceptionWarning
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
Definition: ExceptionMessages.cc:25
edm::WaitingTaskList::add
void add(WaitingTask *)
Adds task to the waiting list.
Definition: WaitingTaskList.cc:89
edm::Path::WorkersInPath
std::vector< WorkerInPath > WorkersInPath
Definition: Path.h:48
Algorithms.h
edm::Path::bitpos_
const int bitpos_
Definition: Path.h:117
edm::Path::TrigResPtr
std::shared_ptr< HLTGlobalStatus > TrigResPtr
Definition: Path.h:50
edm::Path::processOneOccurrenceAsync
void processOneOccurrenceAsync(WaitingTask *, EventPrincipal const &, EventSetupImpl const &, ServiceToken const &, StreamID const &, StreamContext const *)
Definition: Path.cc:216
newFWLiteAna.found
found
Definition: newFWLiteAna.py:118
edm::EventPrincipal::id
EventID const & id() const
Definition: EventPrincipal.h:92
edm::InRun
Definition: BranchType.h:11
CMS_SA_ALLOW
#define CMS_SA_ALLOW
Definition: thread_safety_macros.h:5
edm::Path::timesFailed_
int timesFailed_
Definition: Path.h:108
edm::Path::finished
void finished(std::exception_ptr, StreamContext const *, EventPrincipal const &iEP, EventSetupImpl const &iES, StreamID const &streamID)
Definition: Path.cc:316
edm::BranchType
BranchType
Definition: BranchType.h:11
edm::ModuleDescription
Definition: ModuleDescription.h:21
edm::Path::timesExcept_
int timesExcept_
Definition: Path.h:109
edm::WaitingTaskList::reset
void reset()
Resets access to the resource so that added tasks will wait.
Definition: WaitingTaskList.cc:51
edm::for_all
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
edm::WaitingTaskList::presetTaskAsFailed
void presetTaskAsFailed(std::exception_ptr iExcept)
Definition: WaitingTaskList.cc:124
edm::errors::ProductNotFound
Definition: EDMException.h:33
edm::ServiceToken
Definition: ServiceToken.h:40
edm::Path::stopProcessingEvent_
std::atomic< bool > *const stopProcessingEvent_
Definition: Path.h:127
edm::Path::updateCounters
void updateCounters(hlt::HLTState state)
Definition: Path.cc:179
edm::EventPrincipal
Definition: EventPrincipal.h:46
edm::StreamContext
Definition: StreamContext.h:31
EarlyDeleteHelper.h
edm::Path::setPathStatusInserter
void setPathStatusInserter(PathStatusInserter *pathStatusInserter, Worker *pathStatusInserterWorker)
Definition: Path.cc:211
edm::Path::pathContext_
PathContext pathContext_
Definition: Path.h:125
edm::PathContext::PathType
PathType
Definition: PathContext.h:26
edm::exception_actions::FailPath
Definition: ExceptionActions.h:11
edm::OccurrenceTraits< EventPrincipal, BranchActionStreamBegin >
Definition: OccurrenceTraits.h:34
edm::WaitingTaskList::doneWaiting
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
Definition: WaitingTaskList.cc:169
edm::InEvent
Definition: BranchType.h:11
edm::exception_actions::SkipEvent
Definition: ExceptionActions.h:11
edm::Worker
Definition: Worker.h:83
edm::make_waiting_task
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
edm::Path::trptr_
const TrigResPtr trptr_
Definition: Path.h:118
edm::ParentContext
Definition: ParentContext.h:27
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
edm::PathStatusInserter::setPathStatus
void setPathStatus(StreamID const &, HLTPathStatus const &)
Definition: PathStatusInserter.cc:12
edm::hlt::Fail
reject
Definition: HLTenums.h:19
edm::Path::Path
Path(int bitpos, std::string const &path_name, WorkersInPath const &workers, TrigResPtr trptr, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > reg, StreamContext const *streamContext, std::atomic< bool > *stopProcessEvent, PathContext::PathType pathType)
Definition: Path.cc:16
edm::Path::size
size_type size() const
Definition: Path.h:90
edm::InLumi
Definition: BranchType.h:11
Ready
Definition: hltDiff.cc:243
edm::PathContext
Definition: PathContext.h:24
edm::Path::act_table_
ExceptionToActionTable const *const act_table_
Definition: Path.h:121
edm::Path::workers_
WorkersInPath workers_
Definition: Path.h:123
ExceptionMessages.h
edm::PathStatusInserter
Definition: PathStatusInserter.h:15
edm::Path
Definition: Path.h:44
edm::HLTPathStatus
Definition: HLTPathStatus.h:33
edm::Path::exceptionContext
static void exceptionContext(cms::Exception &ex, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id, PathContext const &)
Definition: Path.cc:117
edm::Path::actReg_
const std::shared_ptr< ActivityRegistry > actReg_
Definition: Path.h:120
writedatasetfile.action
action
Definition: writedatasetfile.py:8
ExceptionActions.h
edm::Path::failedModuleIndex_
int failedModuleIndex_
Definition: Path.h:114
edm::ExceptionToActionTable
Definition: ExceptionActions.h:16
PathStatusInserter.h
edm::PathContext::pathName
std::string const & pathName() const
Definition: PathContext.h:30
edm::Path::setEarlyDeleteHelpers
void setEarlyDeleteHelpers(std::map< const Worker *, EarlyDeleteHelper * > const &)
Definition: Path.cc:202
alignCSCRings.r
r
Definition: alignCSCRings.py:93
edm::WorkerInPath::clearCounters
void clearCounters()
Definition: WorkerInPath.h:45
OccurrenceTraits.h
ParentContext.h
edm::WaitingTask
Definition: WaitingTask.h:36
edm::Exception::codeToString
static const std::string & codeToString(Code)
-----------— implementation details ---------------—
Definition: EDMException.cc:52
triggerObjects_cff.id
id
Definition: triggerObjects_cff.py:31
cms::Exception::clone
virtual Exception * clone() const
Definition: Exception.cc:181
edm::ExceptionToActionTable::find
exception_actions::ActionCodes find(const std::string &category) const
Definition: ExceptionActions.cc:85
AlignmentPI::index
index
Definition: AlignmentPayloadInspectorHelper.h:46
actions
roAction_t actions[nactions]
Definition: GenABIO.cc:181
edm::Worker::runModuleDirectly
std::exception_ptr runModuleDirectly(typename T::MyPrincipal const &ep, EventSetupImpl const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:1137
cms::Exception
Definition: Exception.h:70
edm::Path::handleWorkerFailure
bool handleWorkerFailure(cms::Exception &e, int nwrwue, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id) const
Definition: Path.cc:68
edm::hlt::Exception
error
Definition: HLTenums.h:20
edm::Path::timesRun_
int timesRun_
Definition: Path.h:106
edm::hlt::Pass
accept
Definition: HLTenums.h:18
edm::Path::clearCounters
void clearCounters()
Definition: Path.cc:196
edm::Path::recordStatus
void recordStatus(int nwrwue, hlt::HLTState state)
Definition: Path.cc:173
edm::Path::workerFinished
void workerFinished(std::exception_ptr const *iException, unsigned int iModuleIndex, EventPrincipal const &iEP, EventSetupImpl const &iES, ServiceToken const &iToken, StreamID const &iID, StreamContext const *iContext)
Definition: Path.cc:243
edm::ServiceRegistry::Operate
Definition: ServiceRegistry.h:40
edm::Path::state_
State state_
Definition: Path.h:115
EcalCalibMonitorClient_cfi.workers
workers
Definition: EcalCalibMonitorClient_cfi.py:19
begin
#define begin
Definition: vmac.h:32
edm::Path::runNextWorkerAsync
void runNextWorkerAsync(unsigned int iNextModuleIndex, EventPrincipal const &, EventSetupImpl const &, ServiceToken const &, StreamID const &, StreamContext const *)
Definition: Path.cc:345
edm::exception_actions::ActionCodes
ActionCodes
Definition: ExceptionActions.h:11
findQualityFiles.size
size
Write out results.
Definition: findQualityFiles.py:443
ValidationMatrix.hlt
hlt
Definition: ValidationMatrix.py:459
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37
unpackBuffers-CaloStage2.token
token
Definition: unpackBuffers-CaloStage2.py:316
edm::Path::timesPassed_
int timesPassed_
Definition: Path.h:107