CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
Path.cc
Go to the documentation of this file.
1 
8 
9 #include <algorithm>
10 
11 namespace edm {
12  Path::Path(int bitpos, std::string const& path_name,
13  WorkersInPath const& workers,
14  TrigResPtr trptr,
16  std::shared_ptr<ActivityRegistry> areg,
17  StreamContext const* streamContext,
18  std::atomic<bool>* stopProcessingEvent,
19  PathContext::PathType pathType) :
20  timesRun_(),
21  timesPassed_(),
22  timesFailed_(),
23  timesExcept_(),
24  state_(hlt::Ready),
25  bitpos_(bitpos),
26  trptr_(trptr),
27  actReg_(areg),
28  act_table_(&actions),
29  workers_(workers),
30  pathContext_(path_name, streamContext, bitpos, pathType),
31  stopProcessingEvent_(stopProcessingEvent){
32 
33  for (auto& workerInPath : workers_) {
34  workerInPath.setPathContext(&pathContext_);
35  }
36  }
37 
38  Path::Path(Path const& r) :
39  timesRun_(r.timesRun_),
40  timesPassed_(r.timesPassed_),
41  timesFailed_(r.timesFailed_),
42  timesExcept_(r.timesExcept_),
43  state_(r.state_),
44  bitpos_(r.bitpos_),
45  trptr_(r.trptr_),
46  actReg_(r.actReg_),
47  act_table_(r.act_table_),
48  workers_(r.workers_),
49  earlyDeleteHelpers_(r.earlyDeleteHelpers_),
50  pathContext_(r.pathContext_),
51  stopProcessingEvent_(r.stopProcessingEvent_){
52 
53  for (auto& workerInPath : workers_) {
54  workerInPath.setPathContext(&pathContext_);
55  }
56  }
57 
58 
59  bool
61  int nwrwue,
62  bool isEvent,
63  bool begin,
65  ModuleDescription const& desc,
66  std::string const& id) {
67 
68  exceptionContext(e, isEvent, begin, branchType, desc, id, pathContext_);
69 
70  bool should_continue = true;
71 
72  // there is no support as of yet for specific paths having
73  // different exception behavior
74 
75  // If not processing an event, always rethrow.
77  switch(action) {
79  should_continue = false;
80  edm::printCmsExceptionWarning("FailPath", e);
81  break;
82  }
84  //Need the other Paths to stop as soon as possible
86  *stopProcessingEvent_ = true;
87  }
88  }
89  default: {
90  if (isEvent) ++timesExcept_;
92  recordStatus(nwrwue, isEvent);
93  if (action == exception_actions::Rethrow) {
95  if (e.category() == pNF) {
96  std::ostringstream ost;
97  ost << "If you wish to continue processing events after a " << pNF << " exception,\n" <<
98  "add \"SkipEvent = cms.untracked.vstring('ProductNotFound')\" to the \"options\" PSet in the configuration.\n";
99  e.addAdditionalInfo(ost.str());
100  }
101  }
102  throw e;
103  }
104  }
105 
106  return should_continue;
107  }
108 
109  void
111  bool isEvent,
112  bool begin,
114  ModuleDescription const& desc,
115  std::string const& id,
116  PathContext const& pathContext) {
117  std::ostringstream ost;
118  if (not isEvent) {
119  //For the event case, the Worker has already
120  // added the necessary module context to the exception
121  if (begin && branchType == InRun) {
122  ost << "Calling beginRun";
123  }
124  else if (begin && branchType == InLumi) {
125  ost << "Calling beginLuminosityBlock";
126  }
127  else if (!begin && branchType == InLumi) {
128  ost << "Calling endLuminosityBlock";
129  }
130  else if (!begin && branchType == InRun) {
131  ost << "Calling endRun";
132  }
133  else {
134  // It should be impossible to get here ...
135  ost << "Calling unknown function";
136  }
137  ost << " for module " << desc.moduleName() << "/'" << desc.moduleLabel() << "'";
138  ex.addContext(ost.str());
139  ost.str("");
140  }
141  ost << "Running path '" << pathContext.pathName() << "'";
142  ex.addContext(ost.str());
143  ost.str("");
144  ost << "Processing ";
145  ost << id;
146  ex.addContext(ost.str());
147  }
148 
149  void
150  Path::recordStatus(int nwrwue, bool isEvent) {
151  if(isEvent && trptr_) {
152  (*trptr_)[bitpos_]=HLTPathStatus(state_, nwrwue);
153  }
154  }
155 
156  void
157  Path::updateCounters(bool success, bool isEvent) {
158  if (success) {
159  if (isEvent) ++timesPassed_;
160  state_ = hlt::Pass;
161  } else {
162  if(isEvent) ++timesFailed_;
163  state_ = hlt::Fail;
164  }
165  }
166 
167  void
169  using std::placeholders::_1;
171  for_all(workers_, std::bind(&WorkerInPath::clearCounters, _1));
172  }
173 
174  void
175  Path::setEarlyDeleteHelpers(std::map<const Worker*,EarlyDeleteHelper*> const& iWorkerToDeleter) {
176  //we use a temp so we can overset the size but then when moving to earlyDeleteHelpers we only
177  // have to use the space necessary
178  std::vector<EarlyDeleteHelper*> temp;
179  temp.reserve(iWorkerToDeleter.size());
180  for(unsigned int index=0; index !=size();++index) {
181  auto found = iWorkerToDeleter.find(getWorker(index));
182  if(found != iWorkerToDeleter.end()) {
183  temp.push_back(found->second);
184  found->second->addedToPath();
185  }
186  }
187  std::vector<EarlyDeleteHelper*> tempCorrectSize(temp.begin(),temp.end());
188  earlyDeleteHelpers_.swap(tempCorrectSize);
189  }
190 
191  void
193  for(auto helper: earlyDeleteHelpers_) {
194  helper->pathFinished(iEvent);
195  }
196  }
197 
198  void
200  EventPrincipal const& iEP,
201  EventSetup const& iES,
202  StreamID const& iStreamID,
203  StreamContext const* iStreamContext) {
205  ++timesRun_;
206  waitingTasks_.add(iTask);
207  if(actReg_) {
208  actReg_->prePathEventSignal_(*iStreamContext, pathContext_);
209  }
210  state_ = hlt::Ready;
211 
212  if(workers_.empty()) {
213  finished(-1, true, std::exception_ptr(), iStreamContext);
214  return;
215  }
216 
217  runNextWorkerAsync(0,iEP,iES,iStreamID, iStreamContext);
218  }
219 
220  void
221  Path::workerFinished(std::exception_ptr const* iException,
222  unsigned int iModuleIndex,
223  EventPrincipal const& iEP, EventSetup const& iES,
224  StreamID const& iID, StreamContext const* iContext) {
225 
226  //This call also allows the WorkerInPath to update statistics
227  // so should be done even if an exception happened
228  auto& worker = workers_[iModuleIndex];
229  bool shouldContinue = worker.checkResultsOfRunWorker(true);
230  std::exception_ptr finalException;
231  if(iException) {
232  std::unique_ptr<cms::Exception> pEx;
233  try {
234  std::rethrow_exception(*iException);
235  } catch(cms::Exception& oldEx) {
236  pEx = std::make_unique<cms::Exception>(oldEx);
237  }
238  try {
239  std::ostringstream ost;
240  ost << iEP.id();
241  shouldContinue = handleWorkerFailure(*pEx, iModuleIndex, /*isEvent*/ true, /*isBegin*/ true, InEvent,
242  worker.getWorker()->description(), ost.str());
243  //If we didn't rethrow, then we effectively skipped
244  worker.skipWorker(iEP);
245  finalException = std::exception_ptr();
246  } catch(...) {
247  shouldContinue = false;
248  finalException = std::current_exception();
249  }
250  }
252  shouldContinue = false;
253  }
254  auto const nextIndex = iModuleIndex +1;
255  if (shouldContinue and nextIndex < workers_.size()) {
256  runNextWorkerAsync(nextIndex, iEP, iES, iID, iContext);
257  return;
258  }
259 
260  if (not shouldContinue) {
261  //we are leaving the path early
262  for(auto it = workers_.begin()+nextIndex, itEnd=workers_.end();
263  it != itEnd; ++it) {
264  it->skipWorker(iEP);
265  }
266  handleEarlyFinish(iEP);
267  }
268  finished(iModuleIndex, shouldContinue, finalException, iContext);
269  }
270 
271  void
272  Path::finished(int iModuleIndex, bool iSucceeded, std::exception_ptr iException, StreamContext const* iContext) {
273 
274  if(not iException) {
275  updateCounters(iSucceeded, true);
276  recordStatus(iModuleIndex, true);
277  }
278  try {
279  HLTPathStatus status(state_, iModuleIndex);
280  actReg_->postPathEventSignal_(*iContext, pathContext_, status);
281  } catch(...) {
282  if(not iException) {
283  iException = std::current_exception();
284  }
285  }
286  waitingTasks_.doneWaiting(iException);
287  }
288 
289  void
290  Path::runNextWorkerAsync(unsigned int iNextModuleIndex,
291  EventPrincipal const& iEP, EventSetup const& iES,
292  StreamID const& iID, StreamContext const* iContext) {
293 
294  //need to make sure Service system is activated on the reading thread
296 
297  auto nextTask = make_waiting_task( tbb::task::allocate_root(),
298  [this, iNextModuleIndex, &iEP,&iES, iID, iContext, token](std::exception_ptr const* iException)
299  {
301  this->workerFinished(iException, iNextModuleIndex, iEP,iES,iID,iContext);
302  });
303 
304  workers_[iNextModuleIndex].runWorkerAsync<
306  iEP,
307  iES,
308  iID,
309  iContext);
310  }
311 
312 }
std::string const & pathName() const
Definition: PathContext.h:37
void recordStatus(int nwrwue, bool isEvent)
Definition: Path.cc:150
void handleEarlyFinish(EventPrincipal const &)
Definition: Path.cc:192
not [yet] run
Definition: HLTenums.h:18
static const std::string & codeToString(Code)
-----------— implementation details ---------------—
Definition: EDMException.cc:50
std::vector< EarlyDeleteHelper * > earlyDeleteHelpers_
Definition: Path.h:107
int timesFailed_
Definition: Path.h:96
std::vector< WorkerInPath > WorkersInPath
Definition: Path.h:46
void add(WaitingTask *)
Adds task to the waiting list.
EventID const & id() const
reject
Definition: HLTenums.h:20
std::string const & moduleName() const
std::string const & category() const
Definition: Exception.cc:183
exception_actions::ActionCodes find(const std::string &category) const
int timesExcept_
Definition: Path.h:97
void runNextWorkerAsync(unsigned int iNextModuleIndex, EventPrincipal const &, EventSetup const &, StreamID const &, StreamContext const *)
Definition: Path.cc:290
PathContext pathContext_
Definition: Path.h:109
size_type size() const
Definition: Path.h:79
WaitingTaskList waitingTasks_
Definition: Path.h:110
void reset()
Resets access to the resource so that added tasks will wait.
actions
Definition: Schedule.cc:384
ServiceToken presentToken() const
std::string const & moduleLabel() const
BranchType
Definition: BranchType.h:11
std::shared_ptr< ActivityRegistry > actReg_
Definition: Path.h:103
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
TrigResPtr trptr_
Definition: Path.h:102
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
int iEvent
Definition: GenABIO.cc:230
void workerFinished(std::exception_ptr const *iException, unsigned int iModuleIndex, EventPrincipal const &iEP, EventSetup const &iES, StreamID const &iID, StreamContext const *iContext)
Definition: Path.cc:221
std::shared_ptr< HLTGlobalStatus > TrigResPtr
Definition: Path.h:48
accept
Definition: HLTenums.h:19
void processOneOccurrenceAsync(WaitingTask *, EventPrincipal const &, EventSetup const &, StreamID const &, StreamContext const *)
Definition: Path.cc:199
int bitpos_
Definition: Path.h:101
Definition: Path.h:42
static ServiceRegistry & instance()
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
areg
Definition: Schedule.cc:384
WorkersInPath workers_
Definition: Path.h:106
static void exceptionContext(cms::Exception &ex, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id, PathContext const &)
Definition: Path.cc:110
void updateCounters(bool succeed, bool isEvent)
Definition: Path.cc:157
Path(int bitpos, std::string const &path_name, WorkersInPath const &workers, TrigResPtr trptr, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > reg, StreamContext const *streamContext, std::atomic< bool > *stopProcessEvent, PathContext::PathType pathType)
Definition: Path.cc:12
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:90
void addContext(std::string const &context)
Definition: Exception.cc:227
bool handleWorkerFailure(cms::Exception &e, int nwrwue, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id)
Definition: Path.cc:60
void setEarlyDeleteHelpers(std::map< const Worker *, EarlyDeleteHelper * > const &)
Definition: Path.cc:175
string action
Definition: mps_fire.py:28
ExceptionToActionTable const * act_table_
Definition: Path.h:104
State state_
Definition: Path.h:99
#define begin
Definition: vmac.h:30
Worker const * getWorker(size_type i) const
Definition: Path.h:84
void finished(int iModuleIndex, bool iSucceeded, std::exception_ptr, StreamContext const *)
Definition: Path.cc:272
int timesPassed_
Definition: Path.h:95
std::atomic< bool > * stopProcessingEvent_
Definition: Path.h:111
int timesRun_
Definition: Path.h:94
void clearCounters()
Definition: Path.cc:168
tuple status
Definition: mps_update.py:57
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)