CMS 3D CMS Logo

Path.cc
Go to the documentation of this file.
1 
11 
12 #include <algorithm>
13 
14 namespace edm {
15  Path::Path(int bitpos,
16  std::string const& path_name,
17  WorkersInPath const& workers,
18  TrigResPtr trptr,
20  std::shared_ptr<ActivityRegistry> areg,
21  StreamContext const* streamContext,
22  std::atomic<bool>* stopProcessingEvent,
23  PathContext::PathType pathType)
24  : timesRun_(),
25  timesPassed_(),
26  timesFailed_(),
27  timesExcept_(),
28  state_(hlt::Ready),
29  bitpos_(bitpos),
30  trptr_(trptr),
31  actReg_(areg),
32  act_table_(&actions),
33  workers_(workers),
34  pathContext_(path_name, streamContext, bitpos, pathType),
35  stopProcessingEvent_(stopProcessingEvent),
36  pathStatusInserter_(nullptr),
37  pathStatusInserterWorker_(nullptr) {
38  for (auto& workerInPath : workers_) {
39  workerInPath.setPathContext(&pathContext_);
40  }
41  }
42 
43  Path::Path(Path const& r)
44  : timesRun_(r.timesRun_),
48  state_(r.state_),
49  bitpos_(r.bitpos_),
50  trptr_(r.trptr_),
51  actReg_(r.actReg_),
53  workers_(r.workers_),
59  for (auto& workerInPath : workers_) {
60  workerInPath.setPathContext(&pathContext_);
61  }
62  }
63 
65  int nwrwue,
66  bool isEvent,
67  bool begin,
69  ModuleDescription const& desc,
70  std::string const& id) {
71  if (e.context().empty()) {
72  exceptionContext(e, isEvent, begin, branchType, desc, id, pathContext_);
73  }
74  bool should_continue = true;
75 
76  // there is no support as of yet for specific paths having
77  // different exception behavior
78 
79  // If not processing an event, always rethrow.
81  switch (action) {
83  should_continue = false;
84  edm::printCmsExceptionWarning("FailPath", e);
85  break;
86  }
88  //Need the other Paths to stop as soon as possible
90  *stopProcessingEvent_ = true;
91  }
92  }
93  default: {
94  if (isEvent)
95  ++timesExcept_;
97  recordStatus(nwrwue, isEvent);
98  if (action == exception_actions::Rethrow) {
100  if (e.category() == pNF) {
101  std::ostringstream ost;
102  ost << "If you wish to continue processing events after a " << pNF << " exception,\n"
103  << "add \"SkipEvent = cms.untracked.vstring('ProductNotFound')\" to the \"options\" PSet in the "
104  "configuration.\n";
105  e.addAdditionalInfo(ost.str());
106  }
107  }
108  //throw will copy which will slice the object
109  e.raise();
110  }
111  }
112 
113  return should_continue;
114  }
115 
117  bool isEvent,
118  bool begin,
120  ModuleDescription const& desc,
121  std::string const& id,
122  PathContext const& pathContext) {
123  std::ostringstream ost;
124  ost << "Running path '" << pathContext.pathName() << "'";
125  ex.addContext(ost.str());
126  ost.str("");
127  ost << "Processing ";
128  //For the event case, the Worker has already
129  // added the necessary module context to the exception
130  if (begin && branchType == InRun) {
131  ost << "stream begin Run";
132  } else if (begin && branchType == InLumi) {
133  ost << "stream begin LuminosityBlock ";
134  } else if (!begin && branchType == InLumi) {
135  ost << "stream end LuminosityBlock ";
136  } else if (!begin && branchType == InRun) {
137  ost << "stream end Run ";
138  } else if (isEvent) {
139  // It should be impossible to get here ...
140  ost << "Event ";
141  }
142  ost << id;
143  ex.addContext(ost.str());
144  }
145 
146  void Path::recordStatus(int nwrwue, bool isEvent) {
147  if (isEvent && trptr_) {
148  (*trptr_)[bitpos_] = HLTPathStatus(state_, nwrwue);
149  }
150  }
151 
152  void Path::updateCounters(bool success, bool isEvent) {
153  if (success) {
154  if (isEvent)
155  ++timesPassed_;
156  state_ = hlt::Pass;
157  } else {
158  if (isEvent)
159  ++timesFailed_;
160  state_ = hlt::Fail;
161  }
162  }
163 
165  using std::placeholders::_1;
167  for_all(workers_, std::bind(&WorkerInPath::clearCounters, _1));
168  }
169 
170  void Path::setEarlyDeleteHelpers(std::map<const Worker*, EarlyDeleteHelper*> const& iWorkerToDeleter) {
171  //we use a temp so we can overset the size but then when moving to earlyDeleteHelpers we only
172  // have to use the space necessary
173  std::vector<EarlyDeleteHelper*> temp;
174  temp.reserve(iWorkerToDeleter.size());
175  for (unsigned int index = 0; index != size(); ++index) {
176  auto found = iWorkerToDeleter.find(getWorker(index));
177  if (found != iWorkerToDeleter.end()) {
178  temp.push_back(found->second);
179  found->second->addedToPath();
180  }
181  }
182  std::vector<EarlyDeleteHelper*> tempCorrectSize(temp.begin(), temp.end());
183  earlyDeleteHelpers_.swap(tempCorrectSize);
184  }
185 
186  void Path::setPathStatusInserter(PathStatusInserter* pathStatusInserter, Worker* pathStatusInserterWorker) {
187  pathStatusInserter_ = pathStatusInserter;
188  pathStatusInserterWorker_ = pathStatusInserterWorker;
189  }
190 
192  for (auto helper : earlyDeleteHelpers_) {
193  helper->pathFinished(iEvent);
194  }
195  }
196 
198  EventPrincipal const& iEP,
199  EventSetupImpl const& iES,
200  ServiceToken const& iToken,
201  StreamID const& iStreamID,
202  StreamContext const* iStreamContext) {
204  ++timesRun_;
205  waitingTasks_.add(iTask);
206  if (actReg_) {
207  ServiceRegistry::Operate guard(iToken);
208  actReg_->prePathEventSignal_(*iStreamContext, pathContext_);
209  }
210  state_ = hlt::Ready;
211 
212  if (workers_.empty()) {
213  ServiceRegistry::Operate guard(iToken);
214  finished(-1, true, std::exception_ptr(), iStreamContext, iEP, iES, iStreamID);
215  return;
216  }
217 
218  runNextWorkerAsync(0, iEP, iES, iToken, iStreamID, iStreamContext);
219  }
220 
221  void Path::workerFinished(std::exception_ptr const* iException,
222  unsigned int iModuleIndex,
223  EventPrincipal const& iEP,
224  EventSetupImpl const& iES,
225  ServiceToken const& iToken,
226  StreamID const& iID,
227  StreamContext const* iContext) {
228  ServiceRegistry::Operate guard(iToken);
229 
230  //This call also allows the WorkerInPath to update statistics
231  // so should be done even if an exception happened
232  auto& worker = workers_[iModuleIndex];
233  bool shouldContinue = worker.checkResultsOfRunWorker(true);
234  std::exception_ptr finalException;
235  if (iException) {
236  std::unique_ptr<cms::Exception> pEx;
237  try {
238  std::rethrow_exception(*iException);
239  } catch (cms::Exception& oldEx) {
240  pEx = std::unique_ptr<cms::Exception>(oldEx.clone());
241  }
242  try {
243  std::ostringstream ost;
244  ost << iEP.id();
245  shouldContinue = handleWorkerFailure(*pEx,
246  iModuleIndex,
247  /*isEvent*/ true,
248  /*isBegin*/ true,
249  InEvent,
250  worker.getWorker()->description(),
251  ost.str());
252  //If we didn't rethrow, then we effectively skipped
253  worker.skipWorker(iEP);
254  finalException = std::exception_ptr();
255  } catch (...) {
256  shouldContinue = false;
257  finalException = std::current_exception();
258  //set the exception early to avoid case where another Path is waiting
259  // on a module in this Path and not running the module will lead to a
260  // different but related exception in the other Path. We want this
261  // Paths exception to be the one that gets reported.
262  waitingTasks_.presetTaskAsFailed(finalException);
263  }
264  }
266  shouldContinue = false;
267  }
268  auto const nextIndex = iModuleIndex + 1;
269  if (shouldContinue and nextIndex < workers_.size()) {
270  runNextWorkerAsync(nextIndex, iEP, iES, iToken, iID, iContext);
271  return;
272  }
273 
274  if (not shouldContinue) {
275  //we are leaving the path early
276  for (auto it = workers_.begin() + nextIndex, itEnd = workers_.end(); it != itEnd; ++it) {
277  it->skipWorker(iEP);
278  }
279  handleEarlyFinish(iEP);
280  }
281  finished(iModuleIndex, shouldContinue, finalException, iContext, iEP, iES, iID);
282  }
283 
284  void Path::finished(int iModuleIndex,
285  bool iSucceeded,
286  std::exception_ptr iException,
287  StreamContext const* iContext,
288  EventPrincipal const& iEP,
289  EventSetupImpl const& iES,
290  StreamID const& streamID) {
291  if (not iException) {
292  updateCounters(iSucceeded, true);
293  recordStatus(iModuleIndex, true);
294  }
295  try {
296  HLTPathStatus status(state_, iModuleIndex);
297 
298  if (pathStatusInserter_) { // pathStatusInserter is null for EndPaths
299  pathStatusInserter_->setPathStatus(streamID, status);
300  }
301  std::exception_ptr jException =
303  iEP, iES, streamID, ParentContext(iContext), iContext);
304  if (jException && not iException) {
305  iException = jException;
306  }
307  actReg_->postPathEventSignal_(*iContext, pathContext_, status);
308  } catch (...) {
309  if (not iException) {
310  iException = std::current_exception();
311  }
312  }
313  waitingTasks_.doneWaiting(iException);
314  }
315 
316  void Path::runNextWorkerAsync(unsigned int iNextModuleIndex,
317  EventPrincipal const& iEP,
318  EventSetupImpl const& iES,
319  ServiceToken const& iToken,
320  StreamID const& iID,
321  StreamContext const* iContext) {
322  auto nextTask = make_waiting_task(
323  tbb::task::allocate_root(),
324  [this, iNextModuleIndex, &iEP, &iES, iID, iContext, token = iToken](std::exception_ptr const* iException) {
325  this->workerFinished(iException, iNextModuleIndex, iEP, iES, token, iID, iContext);
326  });
327 
329  nextTask, iEP, iES, iToken, iID, iContext);
330  }
331 
332 } // namespace edm
std::string const & pathName() const
Definition: PathContext.h:30
void recordStatus(int nwrwue, bool isEvent)
Definition: Path.cc:146
void handleEarlyFinish(EventPrincipal const &)
Definition: Path.cc:191
Definition: helper.py:1
not [yet] run
Definition: HLTenums.h:18
roAction_t actions[nactions]
Definition: GenABIO.cc:181
static const std::string & codeToString(Code)
-----------— implementation details ---------------—
Definition: EDMException.cc:52
std::vector< EarlyDeleteHelper * > earlyDeleteHelpers_
Definition: Path.h:120
int timesFailed_
Definition: Path.h:109
std::vector< WorkerInPath > WorkersInPath
Definition: Path.h:48
void add(WaitingTask *)
Adds task to the waiting list.
std::exception_ptr runModuleDirectly(typename T::MyPrincipal const &ep, EventSetupImpl const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:1126
#define nullptr
EventID const & id() const
reject
Definition: HLTenums.h:20
void raise()
Definition: Exception.h:98
void setPathStatus(StreamID const &, HLTPathStatus const &)
std::string const & category() const
Definition: Exception.cc:143
exception_actions::ActionCodes find(const std::string &category) const
int timesExcept_
Definition: Path.h:110
PathContext pathContext_
Definition: Path.h:122
size_type size() const
Definition: Path.h:91
WaitingTaskList waitingTasks_
Definition: Path.h:123
void reset()
Resets access to the resource so that added tasks will wait.
void processOneOccurrenceAsync(WaitingTask *, EventPrincipal const &, EventSetupImpl const &, ServiceToken const &, StreamID const &, StreamContext const *)
Definition: Path.cc:197
BranchType
Definition: BranchType.h:11
void setPathStatusInserter(PathStatusInserter *pathStatusInserter, Worker *pathStatusInserterWorker)
Definition: Path.cc:186
std::shared_ptr< ActivityRegistry > actReg_
Definition: Path.h:116
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:14
TrigResPtr trptr_
Definition: Path.h:115
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
int iEvent
Definition: GenABIO.cc:224
std::shared_ptr< HLTGlobalStatus > TrigResPtr
Definition: Path.h:50
accept
Definition: HLTenums.h:19
int bitpos_
Definition: Path.h:114
Definition: Path.h:44
std::list< std::string > const & context() const
Definition: Exception.cc:147
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:169
WorkersInPath workers_
Definition: Path.h:119
static void exceptionContext(cms::Exception &ex, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id, PathContext const &)
Definition: Path.cc:116
void presetTaskAsFailed(std::exception_ptr iExcept)
void updateCounters(bool succeed, bool isEvent)
Definition: Path.cc:152
Path(int bitpos, std::string const &path_name, WorkersInPath const &workers, TrigResPtr trptr, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > reg, StreamContext const *streamContext, std::atomic< bool > *stopProcessEvent, PathContext::PathType pathType)
Definition: Path.cc:15
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
void addContext(std::string const &context)
Definition: Exception.cc:165
bool handleWorkerFailure(cms::Exception &e, int nwrwue, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id)
Definition: Path.cc:64
void setEarlyDeleteHelpers(std::map< const Worker *, EarlyDeleteHelper * > const &)
Definition: Path.cc:170
ExceptionToActionTable const * act_table_
Definition: Path.h:117
State state_
Definition: Path.h:112
#define begin
Definition: vmac.h:32
HLT enums.
Worker const * getWorker(size_type i) const
Definition: Path.h:96
void finished(int iModuleIndex, bool iSucceeded, std::exception_ptr, StreamContext const *, EventPrincipal const &iEP, EventSetupImpl const &iES, StreamID const &streamID)
Definition: Path.cc:284
void runNextWorkerAsync(unsigned int iNextModuleIndex, EventPrincipal const &, EventSetupImpl const &, ServiceToken const &, StreamID const &, StreamContext const *)
Definition: Path.cc:316
int timesPassed_
Definition: Path.h:108
std::atomic< bool > * stopProcessingEvent_
Definition: Path.h:124
int timesRun_
Definition: Path.h:107
def branchType(schema, name)
Definition: revisionDML.py:114
Worker * pathStatusInserterWorker_
Definition: Path.h:127
void clearCounters()
Definition: Path.cc:164
void workerFinished(std::exception_ptr const *iException, unsigned int iModuleIndex, EventPrincipal const &iEP, EventSetupImpl const &iES, ServiceToken const &iToken, StreamID const &iID, StreamContext const *iContext)
Definition: Path.cc:221
virtual Exception * clone() const
Definition: Exception.cc:181
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
PathStatusInserter * pathStatusInserter_
Definition: Path.h:126