CMS 3D CMS Logo

Path.cc
Go to the documentation of this file.
1 
11 
12 #include <algorithm>
13 
14 namespace edm {
15  Path::Path(int bitpos, std::string const& path_name,
16  WorkersInPath const& workers,
17  TrigResPtr trptr,
19  std::shared_ptr<ActivityRegistry> areg,
20  StreamContext const* streamContext,
21  std::atomic<bool>* stopProcessingEvent,
22  PathContext::PathType pathType) :
23  timesRun_(),
24  timesPassed_(),
25  timesFailed_(),
26  timesExcept_(),
27  state_(hlt::Ready),
28  bitpos_(bitpos),
29  trptr_(trptr),
30  actReg_(areg),
31  act_table_(&actions),
32  workers_(workers),
33  pathContext_(path_name, streamContext, bitpos, pathType),
34  stopProcessingEvent_(stopProcessingEvent),
35  pathStatusInserter_(nullptr),
36  pathStatusInserterWorker_(nullptr) {
37 
38  for (auto& workerInPath : workers_) {
39  workerInPath.setPathContext(&pathContext_);
40  }
41  }
42 
43  Path::Path(Path const& r) :
48  state_(r.state_),
49  bitpos_(r.bitpos_),
50  trptr_(r.trptr_),
51  actReg_(r.actReg_),
53  workers_(r.workers_),
59 
60  for (auto& workerInPath : workers_) {
61  workerInPath.setPathContext(&pathContext_);
62  }
63  }
64 
65 
66  bool
68  int nwrwue,
69  bool isEvent,
70  bool begin,
72  ModuleDescription const& desc,
73  std::string const& id) {
74  if(e.context().empty()) {
75  exceptionContext(e, isEvent, begin, branchType, desc, id, pathContext_);
76  }
77  bool should_continue = true;
78 
79  // there is no support as of yet for specific paths having
80  // different exception behavior
81 
82  // If not processing an event, always rethrow.
84  switch(action) {
86  should_continue = false;
87  edm::printCmsExceptionWarning("FailPath", e);
88  break;
89  }
91  //Need the other Paths to stop as soon as possible
93  *stopProcessingEvent_ = true;
94  }
95  }
96  default: {
97  if (isEvent) ++timesExcept_;
99  recordStatus(nwrwue, isEvent);
100  if (action == exception_actions::Rethrow) {
102  if (e.category() == pNF) {
103  std::ostringstream ost;
104  ost << "If you wish to continue processing events after a " << pNF << " exception,\n" <<
105  "add \"SkipEvent = cms.untracked.vstring('ProductNotFound')\" to the \"options\" PSet in the configuration.\n";
106  e.addAdditionalInfo(ost.str());
107  }
108  }
109  //throw will copy which will slice the object
110  e.raise();
111  }
112  }
113 
114  return should_continue;
115  }
116 
117  void
119  bool isEvent,
120  bool begin,
122  ModuleDescription const& desc,
123  std::string const& id,
124  PathContext const& pathContext) {
125  std::ostringstream ost;
126  ost << "Running path '" << pathContext.pathName() << "'";
127  ex.addContext(ost.str());
128  ost.str("");
129  ost << "Processing ";
130  //For the event case, the Worker has already
131  // added the necessary module context to the exception
132  if (begin && branchType == InRun) {
133  ost << "stream begin Run";
134  }
135  else if (begin && branchType == InLumi) {
136  ost << "stream begin LuminosityBlock ";
137  }
138  else if (!begin && branchType == InLumi) {
139  ost << "stream end LuminosityBlock ";
140  }
141  else if (!begin && branchType == InRun) {
142  ost << "stream end Run ";
143  }
144  else if (isEvent) {
145  // It should be impossible to get here ...
146  ost << "Event ";
147  }
148  ost << id;
149  ex.addContext(ost.str());
150  }
151 
152  void
153  Path::recordStatus(int nwrwue, bool isEvent) {
154  if(isEvent && trptr_) {
155  (*trptr_)[bitpos_]=HLTPathStatus(state_, nwrwue);
156  }
157  }
158 
159  void
160  Path::updateCounters(bool success, bool isEvent) {
161  if (success) {
162  if (isEvent) ++timesPassed_;
163  state_ = hlt::Pass;
164  } else {
165  if(isEvent) ++timesFailed_;
166  state_ = hlt::Fail;
167  }
168  }
169 
170  void
172  using std::placeholders::_1;
174  for_all(workers_, std::bind(&WorkerInPath::clearCounters, _1));
175  }
176 
177  void
178  Path::setEarlyDeleteHelpers(std::map<const Worker*,EarlyDeleteHelper*> const& iWorkerToDeleter) {
179  //we use a temp so we can overset the size but then when moving to earlyDeleteHelpers we only
180  // have to use the space necessary
181  std::vector<EarlyDeleteHelper*> temp;
182  temp.reserve(iWorkerToDeleter.size());
183  for(unsigned int index=0; index !=size();++index) {
184  auto found = iWorkerToDeleter.find(getWorker(index));
185  if(found != iWorkerToDeleter.end()) {
186  temp.push_back(found->second);
187  found->second->addedToPath();
188  }
189  }
190  std::vector<EarlyDeleteHelper*> tempCorrectSize(temp.begin(),temp.end());
191  earlyDeleteHelpers_.swap(tempCorrectSize);
192  }
193 
194  void
196  Worker* pathStatusInserterWorker) {
197  pathStatusInserter_ = pathStatusInserter;
198  pathStatusInserterWorker_ = pathStatusInserterWorker;
199  }
200 
201  void
203  for(auto helper: earlyDeleteHelpers_) {
204  helper->pathFinished(iEvent);
205  }
206  }
207 
208  void
210  EventPrincipal const& iEP,
211  EventSetup const& iES,
212  ServiceToken const& iToken,
213  StreamID const& iStreamID,
214  StreamContext const* iStreamContext) {
216  ++timesRun_;
217  waitingTasks_.add(iTask);
218  if(actReg_) {
219  ServiceRegistry::Operate guard(iToken);
220  actReg_->prePathEventSignal_(*iStreamContext, pathContext_);
221  }
222  state_ = hlt::Ready;
223 
224  if(workers_.empty()) {
225  ServiceRegistry::Operate guard(iToken);
226  finished(-1, true, std::exception_ptr(), iStreamContext, iEP, iES, iStreamID);
227  return;
228  }
229 
230  runNextWorkerAsync(0,iEP,iES, iToken, iStreamID, iStreamContext);
231  }
232 
233  void
234  Path::workerFinished(std::exception_ptr const* iException,
235  unsigned int iModuleIndex,
236  EventPrincipal const& iEP, EventSetup const& iES,
237  ServiceToken const& iToken,
238  StreamID const& iID, StreamContext const* iContext) {
239  ServiceRegistry::Operate guard(iToken);
240 
241  //This call also allows the WorkerInPath to update statistics
242  // so should be done even if an exception happened
243  auto& worker = workers_[iModuleIndex];
244  bool shouldContinue = worker.checkResultsOfRunWorker(true);
245  std::exception_ptr finalException;
246  if(iException) {
247  std::unique_ptr<cms::Exception> pEx;
248  try {
249  std::rethrow_exception(*iException);
250  } catch(cms::Exception& oldEx) {
251  pEx = std::unique_ptr<cms::Exception>(oldEx.clone());
252  }
253  try {
254  std::ostringstream ost;
255  ost << iEP.id();
256  shouldContinue = handleWorkerFailure(*pEx, iModuleIndex, /*isEvent*/ true, /*isBegin*/ true, InEvent,
257  worker.getWorker()->description(), ost.str());
258  //If we didn't rethrow, then we effectively skipped
259  worker.skipWorker(iEP);
260  finalException = std::exception_ptr();
261  } catch(...) {
262  shouldContinue = false;
263  finalException = std::current_exception();
264  //set the exception early to avoid case where another Path is waiting
265  // on a module in this Path and not running the module will lead to a
266  // different but related exception in the other Path. We want this
267  // Paths exception to be the one that gets reported.
268  waitingTasks_.presetTaskAsFailed(finalException);
269  }
270  }
272  shouldContinue = false;
273  }
274  auto const nextIndex = iModuleIndex +1;
275  if (shouldContinue and nextIndex < workers_.size()) {
276  runNextWorkerAsync(nextIndex, iEP, iES, iToken, iID, iContext);
277  return;
278  }
279 
280  if (not shouldContinue) {
281  //we are leaving the path early
282  for(auto it = workers_.begin()+nextIndex, itEnd=workers_.end();
283  it != itEnd; ++it) {
284  it->skipWorker(iEP);
285  }
286  handleEarlyFinish(iEP);
287  }
288  finished(iModuleIndex, shouldContinue, finalException, iContext, iEP, iES,iID);
289  }
290 
291  void
292  Path::finished(int iModuleIndex, bool iSucceeded, std::exception_ptr iException, StreamContext const* iContext,
293  EventPrincipal const& iEP,
294  EventSetup const& iES,
295  StreamID const& streamID) {
296 
297  if(not iException) {
298  updateCounters(iSucceeded, true);
299  recordStatus(iModuleIndex, true);
300  }
301  try {
302  HLTPathStatus status(state_, iModuleIndex);
303 
304  if (pathStatusInserter_) { // pathStatusInserter is null for EndPaths
305  pathStatusInserter_->setPathStatus(streamID, status);
306  }
307  std::exception_ptr jException =
310  iEP, iES, streamID, ParentContext(iContext), iContext
311  );
312  if(jException && not iException) {
313  iException = jException;
314  }
315  actReg_->postPathEventSignal_(*iContext, pathContext_, status);
316  } catch(...) {
317  if(not iException) {
318  iException = std::current_exception();
319  }
320  }
321  waitingTasks_.doneWaiting(iException);
322  }
323 
324  void
325  Path::runNextWorkerAsync(unsigned int iNextModuleIndex,
326  EventPrincipal const& iEP, EventSetup const& iES,
327  ServiceToken const& iToken,
328  StreamID const& iID, StreamContext const* iContext) {
329 
330  auto nextTask = make_waiting_task( tbb::task::allocate_root(),
331  [this, iNextModuleIndex, &iEP,&iES, iID, iContext, token=iToken](std::exception_ptr const* iException)
332  {
333  this->workerFinished(iException, iNextModuleIndex, iEP,iES,token,iID,iContext);
334  });
335 
336  workers_[iNextModuleIndex].runWorkerAsync<
338  iEP,
339  iES,
340  iToken,
341  iID,
342  iContext);
343  }
344 
345 }
void processOneOccurrenceAsync(WaitingTask *, EventPrincipal const &, EventSetup const &, ServiceToken const &, StreamID const &, StreamContext const *)
Definition: Path.cc:209
std::string const & pathName() const
Definition: PathContext.h:37
void recordStatus(int nwrwue, bool isEvent)
Definition: Path.cc:153
void handleEarlyFinish(EventPrincipal const &)
Definition: Path.cc:202
Definition: helper.py:1
not [yet] run
Definition: HLTenums.h:18
roAction_t actions[nactions]
Definition: GenABIO.cc:187
static const std::string & codeToString(Code)
-----------— implementation details ---------------—
Definition: EDMException.cc:53
std::vector< EarlyDeleteHelper * > earlyDeleteHelpers_
Definition: Path.h:117
int timesFailed_
Definition: Path.h:106
std::vector< WorkerInPath > WorkersInPath
Definition: Path.h:48
void add(WaitingTask *)
Adds task to the waiting list.
EventID const & id() const
reject
Definition: HLTenums.h:20
void raise()
Definition: Exception.h:104
void setPathStatus(StreamID const &, HLTPathStatus const &)
std::string const & category() const
Definition: Exception.cc:183
exception_actions::ActionCodes find(const std::string &category) const
int timesExcept_
Definition: Path.h:107
PathContext pathContext_
Definition: Path.h:119
size_type size() const
Definition: Path.h:86
WaitingTaskList waitingTasks_
Definition: Path.h:120
#define nullptr
void reset()
Resets access to the resource so that added tasks will wait.
std::exception_ptr runModuleDirectly(typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:1159
BranchType
Definition: BranchType.h:11
void setPathStatusInserter(PathStatusInserter *pathStatusInserter, Worker *pathStatusInserterWorker)
Definition: Path.cc:195
std::shared_ptr< ActivityRegistry > actReg_
Definition: Path.h:113
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
void runNextWorkerAsync(unsigned int iNextModuleIndex, EventPrincipal const &, EventSetup const &, ServiceToken const &, StreamID const &, StreamContext const *)
Definition: Path.cc:325
TrigResPtr trptr_
Definition: Path.h:112
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
int iEvent
Definition: GenABIO.cc:230
std::shared_ptr< HLTGlobalStatus > TrigResPtr
Definition: Path.h:50
accept
Definition: HLTenums.h:19
int bitpos_
Definition: Path.h:111
Definition: Path.h:44
std::list< std::string > const & context() const
Definition: Exception.cc:191
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
WorkersInPath workers_
Definition: Path.h:116
static void exceptionContext(cms::Exception &ex, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id, PathContext const &)
Definition: Path.cc:118
void finished(int iModuleIndex, bool iSucceeded, std::exception_ptr, StreamContext const *, EventPrincipal const &iEP, EventSetup const &iES, StreamID const &streamID)
Definition: Path.cc:292
void presetTaskAsFailed(std::exception_ptr iExcept)
void updateCounters(bool succeed, bool isEvent)
Definition: Path.cc:160
Path(int bitpos, std::string const &path_name, WorkersInPath const &workers, TrigResPtr trptr, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > reg, StreamContext const *streamContext, std::atomic< bool > *stopProcessEvent, PathContext::PathType pathType)
Definition: Path.cc:15
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:92
void addContext(std::string const &context)
Definition: Exception.cc:227
bool handleWorkerFailure(cms::Exception &e, int nwrwue, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id)
Definition: Path.cc:67
void setEarlyDeleteHelpers(std::map< const Worker *, EarlyDeleteHelper * > const &)
Definition: Path.cc:178
void workerFinished(std::exception_ptr const *iException, unsigned int iModuleIndex, EventPrincipal const &iEP, EventSetup const &iES, ServiceToken const &iToken, StreamID const &iID, StreamContext const *iContext)
Definition: Path.cc:234
ExceptionToActionTable const * act_table_
Definition: Path.h:114
State state_
Definition: Path.h:109
#define begin
Definition: vmac.h:32
HLT enums.
Worker const * getWorker(size_type i) const
Definition: Path.h:91
int timesPassed_
Definition: Path.h:105
std::atomic< bool > * stopProcessingEvent_
Definition: Path.h:121
int timesRun_
Definition: Path.h:104
def branchType(schema, name)
Definition: revisionDML.py:113
Worker * pathStatusInserterWorker_
Definition: Path.h:124
void clearCounters()
Definition: Path.cc:171
virtual Exception * clone() const
Definition: Exception.cc:259
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
PathStatusInserter * pathStatusInserter_
Definition: Path.h:123