CMS 3D CMS Logo

Path.cc
Go to the documentation of this file.
1 
11 
12 #include <algorithm>
13 
14 namespace edm {
15  Path::Path(int bitpos, std::string const& path_name,
16  WorkersInPath const& workers,
17  TrigResPtr trptr,
19  std::shared_ptr<ActivityRegistry> areg,
20  StreamContext const* streamContext,
21  std::atomic<bool>* stopProcessingEvent,
22  PathContext::PathType pathType) :
23  timesRun_(),
24  timesPassed_(),
25  timesFailed_(),
26  timesExcept_(),
27  state_(hlt::Ready),
28  bitpos_(bitpos),
29  trptr_(trptr),
30  actReg_(areg),
31  act_table_(&actions),
32  workers_(workers),
33  pathContext_(path_name, streamContext, bitpos, pathType),
34  stopProcessingEvent_(stopProcessingEvent),
35  pathStatusInserter_(nullptr),
36  pathStatusInserterWorker_(nullptr) {
37 
38  for (auto& workerInPath : workers_) {
39  workerInPath.setPathContext(&pathContext_);
40  }
41  }
42 
43  Path::Path(Path const& r) :
48  state_(r.state_),
49  bitpos_(r.bitpos_),
50  trptr_(r.trptr_),
51  actReg_(r.actReg_),
53  workers_(r.workers_),
59 
60  for (auto& workerInPath : workers_) {
61  workerInPath.setPathContext(&pathContext_);
62  }
63  }
64 
65 
66  bool
68  int nwrwue,
69  bool isEvent,
70  bool begin,
72  ModuleDescription const& desc,
73  std::string const& id) {
74  if(e.context().empty()) {
75  exceptionContext(e, isEvent, begin, branchType, desc, id, pathContext_);
76  }
77  bool should_continue = true;
78 
79  // there is no support as of yet for specific paths having
80  // different exception behavior
81 
82  // If not processing an event, always rethrow.
84  switch(action) {
86  should_continue = false;
87  edm::printCmsExceptionWarning("FailPath", e);
88  break;
89  }
91  //Need the other Paths to stop as soon as possible
93  *stopProcessingEvent_ = true;
94  }
95  }
96  default: {
97  if (isEvent) ++timesExcept_;
99  recordStatus(nwrwue, isEvent);
100  if (action == exception_actions::Rethrow) {
102  if (e.category() == pNF) {
103  std::ostringstream ost;
104  ost << "If you wish to continue processing events after a " << pNF << " exception,\n" <<
105  "add \"SkipEvent = cms.untracked.vstring('ProductNotFound')\" to the \"options\" PSet in the configuration.\n";
106  e.addAdditionalInfo(ost.str());
107  }
108  }
109  //throw will copy which will slice the object
110  e.raise();
111  }
112  }
113 
114  return should_continue;
115  }
116 
117  void
119  bool isEvent,
120  bool begin,
122  ModuleDescription const& desc,
123  std::string const& id,
124  PathContext const& pathContext) {
125  std::ostringstream ost;
126  ost << "Running path '" << pathContext.pathName() << "'";
127  ex.addContext(ost.str());
128  ost.str("");
129  ost << "Processing ";
130  //For the event case, the Worker has already
131  // added the necessary module context to the exception
132  if (begin && branchType == InRun) {
133  ost << "stream begin Run";
134  }
135  else if (begin && branchType == InLumi) {
136  ost << "stream begin LuminosityBlock ";
137  }
138  else if (!begin && branchType == InLumi) {
139  ost << "stream end LuminosityBlock ";
140  }
141  else if (!begin && branchType == InRun) {
142  ost << "stream end Run ";
143  }
144  else if (isEvent) {
145  // It should be impossible to get here ...
146  ost << "Event ";
147  }
148  ost << id;
149  ex.addContext(ost.str());
150  }
151 
152  void
153  Path::recordStatus(int nwrwue, bool isEvent) {
154  if(isEvent && trptr_) {
155  (*trptr_)[bitpos_]=HLTPathStatus(state_, nwrwue);
156  }
157  }
158 
159  void
160  Path::updateCounters(bool success, bool isEvent) {
161  if (success) {
162  if (isEvent) ++timesPassed_;
163  state_ = hlt::Pass;
164  } else {
165  if(isEvent) ++timesFailed_;
166  state_ = hlt::Fail;
167  }
168  }
169 
170  void
172  using std::placeholders::_1;
174  for_all(workers_, std::bind(&WorkerInPath::clearCounters, _1));
175  }
176 
177  void
178  Path::setEarlyDeleteHelpers(std::map<const Worker*,EarlyDeleteHelper*> const& iWorkerToDeleter) {
179  //we use a temp so we can overset the size but then when moving to earlyDeleteHelpers we only
180  // have to use the space necessary
181  std::vector<EarlyDeleteHelper*> temp;
182  temp.reserve(iWorkerToDeleter.size());
183  for(unsigned int index=0; index !=size();++index) {
184  auto found = iWorkerToDeleter.find(getWorker(index));
185  if(found != iWorkerToDeleter.end()) {
186  temp.push_back(found->second);
187  found->second->addedToPath();
188  }
189  }
190  std::vector<EarlyDeleteHelper*> tempCorrectSize(temp.begin(),temp.end());
191  earlyDeleteHelpers_.swap(tempCorrectSize);
192  }
193 
194  void
196  Worker* pathStatusInserterWorker) {
197  pathStatusInserter_ = pathStatusInserter;
198  pathStatusInserterWorker_ = pathStatusInserterWorker;
199  }
200 
201  void
203  for(auto helper: earlyDeleteHelpers_) {
204  helper->pathFinished(iEvent);
205  }
206  }
207 
208  void
210  EventPrincipal const& iEP,
211  EventSetup const& iES,
212  StreamID const& iStreamID,
213  StreamContext const* iStreamContext) {
215  ++timesRun_;
216  waitingTasks_.add(iTask);
217  if(actReg_) {
218  actReg_->prePathEventSignal_(*iStreamContext, pathContext_);
219  }
220  state_ = hlt::Ready;
221 
222  if(workers_.empty()) {
223  finished(-1, true, std::exception_ptr(), iStreamContext, iEP, iES, iStreamID);
224  return;
225  }
226 
227  runNextWorkerAsync(0,iEP,iES,iStreamID, iStreamContext);
228  }
229 
230  void
231  Path::workerFinished(std::exception_ptr const* iException,
232  unsigned int iModuleIndex,
233  EventPrincipal const& iEP, EventSetup const& iES,
234  StreamID const& iID, StreamContext const* iContext) {
235 
236  //This call also allows the WorkerInPath to update statistics
237  // so should be done even if an exception happened
238  auto& worker = workers_[iModuleIndex];
239  bool shouldContinue = worker.checkResultsOfRunWorker(true);
240  std::exception_ptr finalException;
241  if(iException) {
242  std::unique_ptr<cms::Exception> pEx;
243  try {
244  std::rethrow_exception(*iException);
245  } catch(cms::Exception& oldEx) {
246  pEx = std::unique_ptr<cms::Exception>(oldEx.clone());
247  }
248  try {
249  std::ostringstream ost;
250  ost << iEP.id();
251  shouldContinue = handleWorkerFailure(*pEx, iModuleIndex, /*isEvent*/ true, /*isBegin*/ true, InEvent,
252  worker.getWorker()->description(), ost.str());
253  //If we didn't rethrow, then we effectively skipped
254  worker.skipWorker(iEP);
255  finalException = std::exception_ptr();
256  } catch(...) {
257  shouldContinue = false;
258  finalException = std::current_exception();
259  }
260  }
262  shouldContinue = false;
263  }
264  auto const nextIndex = iModuleIndex +1;
265  if (shouldContinue and nextIndex < workers_.size()) {
266  runNextWorkerAsync(nextIndex, iEP, iES, iID, iContext);
267  return;
268  }
269 
270  if (not shouldContinue) {
271  //we are leaving the path early
272  for(auto it = workers_.begin()+nextIndex, itEnd=workers_.end();
273  it != itEnd; ++it) {
274  it->skipWorker(iEP);
275  }
276  handleEarlyFinish(iEP);
277  }
278  finished(iModuleIndex, shouldContinue, finalException, iContext, iEP, iES, iID);
279  }
280 
281  void
282  Path::finished(int iModuleIndex, bool iSucceeded, std::exception_ptr iException, StreamContext const* iContext,
283  EventPrincipal const& iEP,
284  EventSetup const& iES,
285  StreamID const& streamID) {
286 
287  if(not iException) {
288  updateCounters(iSucceeded, true);
289  recordStatus(iModuleIndex, true);
290  }
291  try {
292  HLTPathStatus status(state_, iModuleIndex);
293 
294  if (pathStatusInserter_) { // pathStatusInserter is null for EndPaths
295  pathStatusInserter_->setPathStatus(streamID, status);
296  }
297  std::exception_ptr jException =
300  iEP, iES, streamID, ParentContext(iContext), iContext
301  );
302  if(jException && not iException) {
303  iException = jException;
304  }
305  actReg_->postPathEventSignal_(*iContext, pathContext_, status);
306  } catch(...) {
307  if(not iException) {
308  iException = std::current_exception();
309  }
310  }
311  waitingTasks_.doneWaiting(iException);
312  }
313 
314  void
315  Path::runNextWorkerAsync(unsigned int iNextModuleIndex,
316  EventPrincipal const& iEP, EventSetup const& iES,
317  StreamID const& iID, StreamContext const* iContext) {
318 
319  //need to make sure Service system is activated on the reading thread
320  auto token = ServiceRegistry::instance().presentToken();
321 
322  auto nextTask = make_waiting_task( tbb::task::allocate_root(),
323  [this, iNextModuleIndex, &iEP,&iES, iID, iContext, token](std::exception_ptr const* iException)
324  {
325  ServiceRegistry::Operate guard(token);
326  this->workerFinished(iException, iNextModuleIndex, iEP,iES,iID,iContext);
327  });
328 
329  workers_[iNextModuleIndex].runWorkerAsync<
331  iEP,
332  iES,
333  iID,
334  iContext);
335  }
336 
337 }
std::string const & pathName() const
Definition: PathContext.h:37
void recordStatus(int nwrwue, bool isEvent)
Definition: Path.cc:153
void handleEarlyFinish(EventPrincipal const &)
Definition: Path.cc:202
Definition: helper.py:1
not [yet] run
Definition: HLTenums.h:18
roAction_t actions[nactions]
Definition: GenABIO.cc:187
static const std::string & codeToString(Code)
-----------— implementation details ---------------—
Definition: EDMException.cc:51
std::vector< EarlyDeleteHelper * > earlyDeleteHelpers_
Definition: Path.h:119
int timesFailed_
Definition: Path.h:108
std::vector< WorkerInPath > WorkersInPath
Definition: Path.h:48
void add(WaitingTask *)
Adds task to the waiting list.
EventID const & id() const
reject
Definition: HLTenums.h:20
void raise()
Definition: Exception.h:104
void setPathStatus(StreamID const &, HLTPathStatus const &)
std::string const & category() const
Definition: Exception.cc:183
exception_actions::ActionCodes find(const std::string &category) const
int timesExcept_
Definition: Path.h:109
void runNextWorkerAsync(unsigned int iNextModuleIndex, EventPrincipal const &, EventSetup const &, StreamID const &, StreamContext const *)
Definition: Path.cc:315
PathContext pathContext_
Definition: Path.h:121
size_type size() const
Definition: Path.h:88
WaitingTaskList waitingTasks_
Definition: Path.h:122
#define nullptr
void reset()
Resets access to the resource so that added tasks will wait.
ServiceToken presentToken() const
std::exception_ptr runModuleDirectly(typename T::MyPrincipal const &ep, EventSetup const &es, StreamID streamID, ParentContext const &parentContext, typename T::Context const *context)
Definition: Worker.h:876
BranchType
Definition: BranchType.h:11
void setPathStatusInserter(PathStatusInserter *pathStatusInserter, Worker *pathStatusInserterWorker)
Definition: Path.cc:195
std::shared_ptr< ActivityRegistry > actReg_
Definition: Path.h:115
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
TrigResPtr trptr_
Definition: Path.h:114
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
int iEvent
Definition: GenABIO.cc:230
void workerFinished(std::exception_ptr const *iException, unsigned int iModuleIndex, EventPrincipal const &iEP, EventSetup const &iES, StreamID const &iID, StreamContext const *iContext)
Definition: Path.cc:231
std::shared_ptr< HLTGlobalStatus > TrigResPtr
Definition: Path.h:50
accept
Definition: HLTenums.h:19
void processOneOccurrenceAsync(WaitingTask *, EventPrincipal const &, EventSetup const &, StreamID const &, StreamContext const *)
Definition: Path.cc:209
int bitpos_
Definition: Path.h:113
Definition: Path.h:44
std::list< std::string > const & context() const
Definition: Exception.cc:191
static ServiceRegistry & instance()
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
WorkersInPath workers_
Definition: Path.h:118
static void exceptionContext(cms::Exception &ex, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id, PathContext const &)
Definition: Path.cc:118
void finished(int iModuleIndex, bool iSucceeded, std::exception_ptr, StreamContext const *, EventPrincipal const &iEP, EventSetup const &iES, StreamID const &streamID)
Definition: Path.cc:282
void updateCounters(bool succeed, bool isEvent)
Definition: Path.cc:160
Path(int bitpos, std::string const &path_name, WorkersInPath const &workers, TrigResPtr trptr, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > reg, StreamContext const *streamContext, std::atomic< bool > *stopProcessEvent, PathContext::PathType pathType)
Definition: Path.cc:15
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
void addContext(std::string const &context)
Definition: Exception.cc:227
bool handleWorkerFailure(cms::Exception &e, int nwrwue, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id)
Definition: Path.cc:67
void setEarlyDeleteHelpers(std::map< const Worker *, EarlyDeleteHelper * > const &)
Definition: Path.cc:178
ExceptionToActionTable const * act_table_
Definition: Path.h:116
State state_
Definition: Path.h:111
#define begin
Definition: vmac.h:30
HLT enums.
Worker const * getWorker(size_type i) const
Definition: Path.h:93
int timesPassed_
Definition: Path.h:107
std::atomic< bool > * stopProcessingEvent_
Definition: Path.h:123
int timesRun_
Definition: Path.h:106
def branchType(schema, name)
Definition: revisionDML.py:112
Worker * pathStatusInserterWorker_
Definition: Path.h:126
void clearCounters()
Definition: Path.cc:171
virtual Exception * clone() const
Definition: Exception.cc:259
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)
PathStatusInserter * pathStatusInserter_
Definition: Path.h:125