CMS 3D CMS Logo

Path.cc
Go to the documentation of this file.
1 
8 
9 #include <algorithm>
10 
11 namespace edm {
12  Path::Path(int bitpos, std::string const& path_name,
13  WorkersInPath const& workers,
14  TrigResPtr trptr,
16  std::shared_ptr<ActivityRegistry> areg,
17  StreamContext const* streamContext,
18  std::atomic<bool>* stopProcessingEvent,
19  PathContext::PathType pathType) :
20  timesRun_(),
21  timesPassed_(),
22  timesFailed_(),
23  timesExcept_(),
24  state_(hlt::Ready),
25  bitpos_(bitpos),
26  trptr_(trptr),
27  actReg_(areg),
28  act_table_(&actions),
29  workers_(workers),
30  pathContext_(path_name, streamContext, bitpos, pathType),
31  stopProcessingEvent_(stopProcessingEvent){
32 
33  for (auto& workerInPath : workers_) {
34  workerInPath.setPathContext(&pathContext_);
35  }
36  }
37 
38  Path::Path(Path const& r) :
43  state_(r.state_),
44  bitpos_(r.bitpos_),
45  trptr_(r.trptr_),
46  actReg_(r.actReg_),
48  workers_(r.workers_),
52 
53  for (auto& workerInPath : workers_) {
54  workerInPath.setPathContext(&pathContext_);
55  }
56  }
57 
58 
59  bool
61  int nwrwue,
62  bool isEvent,
63  bool begin,
65  ModuleDescription const& desc,
66  std::string const& id) {
67  if(e.context().empty()) {
68  exceptionContext(e, isEvent, begin, branchType, desc, id, pathContext_);
69  }
70  bool should_continue = true;
71 
72  // there is no support as of yet for specific paths having
73  // different exception behavior
74 
75  // If not processing an event, always rethrow.
77  switch(action) {
79  should_continue = false;
80  edm::printCmsExceptionWarning("FailPath", e);
81  break;
82  }
84  //Need the other Paths to stop as soon as possible
86  *stopProcessingEvent_ = true;
87  }
88  }
89  default: {
90  if (isEvent) ++timesExcept_;
92  recordStatus(nwrwue, isEvent);
93  if (action == exception_actions::Rethrow) {
95  if (e.category() == pNF) {
96  std::ostringstream ost;
97  ost << "If you wish to continue processing events after a " << pNF << " exception,\n" <<
98  "add \"SkipEvent = cms.untracked.vstring('ProductNotFound')\" to the \"options\" PSet in the configuration.\n";
99  e.addAdditionalInfo(ost.str());
100  }
101  }
102  //throw will copy which will slice the object
103  e.raise();
104  }
105  }
106 
107  return should_continue;
108  }
109 
110  void
112  bool isEvent,
113  bool begin,
115  ModuleDescription const& desc,
116  std::string const& id,
117  PathContext const& pathContext) {
118  std::ostringstream ost;
119  ost << "Running path '" << pathContext.pathName() << "'";
120  ex.addContext(ost.str());
121  ost.str("");
122  ost << "Processing ";
123  //For the event case, the Worker has already
124  // added the necessary module context to the exception
125  if (begin && branchType == InRun) {
126  ost << "stream begin Run";
127  }
128  else if (begin && branchType == InLumi) {
129  ost << "stream begin LuminosityBlock ";
130  }
131  else if (!begin && branchType == InLumi) {
132  ost << "stream end LuminosityBlock ";
133  }
134  else if (!begin && branchType == InRun) {
135  ost << "stream end Run ";
136  }
137  else if (isEvent) {
138  // It should be impossible to get here ...
139  ost << "Event ";
140  }
141  ost << id;
142  ex.addContext(ost.str());
143  }
144 
145  void
146  Path::recordStatus(int nwrwue, bool isEvent) {
147  if(isEvent && trptr_) {
148  (*trptr_)[bitpos_]=HLTPathStatus(state_, nwrwue);
149  }
150  }
151 
152  void
153  Path::updateCounters(bool success, bool isEvent) {
154  if (success) {
155  if (isEvent) ++timesPassed_;
156  state_ = hlt::Pass;
157  } else {
158  if(isEvent) ++timesFailed_;
159  state_ = hlt::Fail;
160  }
161  }
162 
163  void
165  using std::placeholders::_1;
167  for_all(workers_, std::bind(&WorkerInPath::clearCounters, _1));
168  }
169 
170  void
171  Path::setEarlyDeleteHelpers(std::map<const Worker*,EarlyDeleteHelper*> const& iWorkerToDeleter) {
172  //we use a temp so we can overset the size but then when moving to earlyDeleteHelpers we only
173  // have to use the space necessary
174  std::vector<EarlyDeleteHelper*> temp;
175  temp.reserve(iWorkerToDeleter.size());
176  for(unsigned int index=0; index !=size();++index) {
177  auto found = iWorkerToDeleter.find(getWorker(index));
178  if(found != iWorkerToDeleter.end()) {
179  temp.push_back(found->second);
180  found->second->addedToPath();
181  }
182  }
183  std::vector<EarlyDeleteHelper*> tempCorrectSize(temp.begin(),temp.end());
184  earlyDeleteHelpers_.swap(tempCorrectSize);
185  }
186 
187  void
189  for(auto helper: earlyDeleteHelpers_) {
190  helper->pathFinished(iEvent);
191  }
192  }
193 
194  void
196  EventPrincipal const& iEP,
197  EventSetup const& iES,
198  StreamID const& iStreamID,
199  StreamContext const* iStreamContext) {
201  ++timesRun_;
202  waitingTasks_.add(iTask);
203  if(actReg_) {
204  actReg_->prePathEventSignal_(*iStreamContext, pathContext_);
205  }
206  state_ = hlt::Ready;
207 
208  if(workers_.empty()) {
209  finished(-1, true, std::exception_ptr(), iStreamContext);
210  return;
211  }
212 
213  runNextWorkerAsync(0,iEP,iES,iStreamID, iStreamContext);
214  }
215 
216  void
217  Path::workerFinished(std::exception_ptr const* iException,
218  unsigned int iModuleIndex,
219  EventPrincipal const& iEP, EventSetup const& iES,
220  StreamID const& iID, StreamContext const* iContext) {
221 
222  //This call also allows the WorkerInPath to update statistics
223  // so should be done even if an exception happened
224  auto& worker = workers_[iModuleIndex];
225  bool shouldContinue = worker.checkResultsOfRunWorker(true);
226  std::exception_ptr finalException;
227  if(iException) {
228  std::unique_ptr<cms::Exception> pEx;
229  try {
230  std::rethrow_exception(*iException);
231  } catch(cms::Exception& oldEx) {
232  pEx = std::unique_ptr<cms::Exception>(oldEx.clone());
233  }
234  try {
235  std::ostringstream ost;
236  ost << iEP.id();
237  shouldContinue = handleWorkerFailure(*pEx, iModuleIndex, /*isEvent*/ true, /*isBegin*/ true, InEvent,
238  worker.getWorker()->description(), ost.str());
239  //If we didn't rethrow, then we effectively skipped
240  worker.skipWorker(iEP);
241  finalException = std::exception_ptr();
242  } catch(...) {
243  shouldContinue = false;
244  finalException = std::current_exception();
245  }
246  }
248  shouldContinue = false;
249  }
250  auto const nextIndex = iModuleIndex +1;
251  if (shouldContinue and nextIndex < workers_.size()) {
252  runNextWorkerAsync(nextIndex, iEP, iES, iID, iContext);
253  return;
254  }
255 
256  if (not shouldContinue) {
257  //we are leaving the path early
258  for(auto it = workers_.begin()+nextIndex, itEnd=workers_.end();
259  it != itEnd; ++it) {
260  it->skipWorker(iEP);
261  }
262  handleEarlyFinish(iEP);
263  }
264  finished(iModuleIndex, shouldContinue, finalException, iContext);
265  }
266 
267  void
268  Path::finished(int iModuleIndex, bool iSucceeded, std::exception_ptr iException, StreamContext const* iContext) {
269 
270  if(not iException) {
271  updateCounters(iSucceeded, true);
272  recordStatus(iModuleIndex, true);
273  }
274  try {
275  HLTPathStatus status(state_, iModuleIndex);
276  actReg_->postPathEventSignal_(*iContext, pathContext_, status);
277  } catch(...) {
278  if(not iException) {
279  iException = std::current_exception();
280  }
281  }
282  waitingTasks_.doneWaiting(iException);
283  }
284 
285  void
286  Path::runNextWorkerAsync(unsigned int iNextModuleIndex,
287  EventPrincipal const& iEP, EventSetup const& iES,
288  StreamID const& iID, StreamContext const* iContext) {
289 
290  //need to make sure Service system is activated on the reading thread
291  auto token = ServiceRegistry::instance().presentToken();
292 
293  auto nextTask = make_waiting_task( tbb::task::allocate_root(),
294  [this, iNextModuleIndex, &iEP,&iES, iID, iContext, token](std::exception_ptr const* iException)
295  {
296  ServiceRegistry::Operate guard(token);
297  this->workerFinished(iException, iNextModuleIndex, iEP,iES,iID,iContext);
298  });
299 
300  workers_[iNextModuleIndex].runWorkerAsync<
302  iEP,
303  iES,
304  iID,
305  iContext);
306  }
307 
308 }
std::string const & pathName() const
Definition: PathContext.h:37
void recordStatus(int nwrwue, bool isEvent)
Definition: Path.cc:146
void handleEarlyFinish(EventPrincipal const &)
Definition: Path.cc:188
Definition: helper.py:1
not [yet] run
Definition: HLTenums.h:18
roAction_t actions[nactions]
Definition: GenABIO.cc:187
static const std::string & codeToString(Code)
-----------— implementation details ---------------—
Definition: EDMException.cc:50
std::vector< EarlyDeleteHelper * > earlyDeleteHelpers_
Definition: Path.h:114
int timesFailed_
Definition: Path.h:103
std::vector< WorkerInPath > WorkersInPath
Definition: Path.h:46
void add(WaitingTask *)
Adds task to the waiting list.
EventID const & id() const
reject
Definition: HLTenums.h:20
void raise()
Definition: Exception.h:105
std::string const & category() const
Definition: Exception.cc:183
exception_actions::ActionCodes find(const std::string &category) const
int timesExcept_
Definition: Path.h:104
void runNextWorkerAsync(unsigned int iNextModuleIndex, EventPrincipal const &, EventSetup const &, StreamID const &, StreamContext const *)
Definition: Path.cc:286
PathContext pathContext_
Definition: Path.h:116
size_type size() const
Definition: Path.h:86
WaitingTaskList waitingTasks_
Definition: Path.h:117
void reset()
Resets access to the resource so that added tasks will wait.
ServiceToken presentToken() const
BranchType
Definition: BranchType.h:11
std::shared_ptr< ActivityRegistry > actReg_
Definition: Path.h:110
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
TrigResPtr trptr_
Definition: Path.h:109
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
int iEvent
Definition: GenABIO.cc:230
void workerFinished(std::exception_ptr const *iException, unsigned int iModuleIndex, EventPrincipal const &iEP, EventSetup const &iES, StreamID const &iID, StreamContext const *iContext)
Definition: Path.cc:217
std::shared_ptr< HLTGlobalStatus > TrigResPtr
Definition: Path.h:48
accept
Definition: HLTenums.h:19
void processOneOccurrenceAsync(WaitingTask *, EventPrincipal const &, EventSetup const &, StreamID const &, StreamContext const *)
Definition: Path.cc:195
int bitpos_
Definition: Path.h:108
Definition: Path.h:42
std::list< std::string > const & context() const
Definition: Exception.cc:191
static ServiceRegistry & instance()
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
WorkersInPath workers_
Definition: Path.h:113
static void exceptionContext(cms::Exception &ex, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id, PathContext const &)
Definition: Path.cc:111
void updateCounters(bool succeed, bool isEvent)
Definition: Path.cc:153
Path(int bitpos, std::string const &path_name, WorkersInPath const &workers, TrigResPtr trptr, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > reg, StreamContext const *streamContext, std::atomic< bool > *stopProcessEvent, PathContext::PathType pathType)
Definition: Path.cc:12
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
void addContext(std::string const &context)
Definition: Exception.cc:227
bool handleWorkerFailure(cms::Exception &e, int nwrwue, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id)
Definition: Path.cc:60
void setEarlyDeleteHelpers(std::map< const Worker *, EarlyDeleteHelper * > const &)
Definition: Path.cc:171
ExceptionToActionTable const * act_table_
Definition: Path.h:111
State state_
Definition: Path.h:106
#define begin
Definition: vmac.h:30
HLT enums.
Worker const * getWorker(size_type i) const
Definition: Path.h:91
void finished(int iModuleIndex, bool iSucceeded, std::exception_ptr, StreamContext const *)
Definition: Path.cc:268
int timesPassed_
Definition: Path.h:102
std::atomic< bool > * stopProcessingEvent_
Definition: Path.h:118
int timesRun_
Definition: Path.h:101
def branchType(schema, name)
Definition: revisionDML.py:112
void clearCounters()
Definition: Path.cc:164
virtual Exception * clone() const
Definition: Exception.cc:259
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)