test
CMS 3D CMS Logo

All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
Path.cc
Go to the documentation of this file.
1 
8 
9 #include <algorithm>
10 
11 namespace edm {
12  Path::Path(int bitpos, std::string const& path_name,
13  WorkersInPath const& workers,
14  TrigResPtr trptr,
16  std::shared_ptr<ActivityRegistry> areg,
17  StreamContext const* streamContext,
18  std::atomic<bool>* stopProcessingEvent,
19  PathContext::PathType pathType) :
20  timesRun_(),
21  timesPassed_(),
22  timesFailed_(),
23  timesExcept_(),
24  state_(hlt::Ready),
25  bitpos_(bitpos),
26  trptr_(trptr),
27  actReg_(areg),
28  act_table_(&actions),
29  workers_(workers),
30  pathContext_(path_name, streamContext, bitpos, pathType),
31  stopProcessingEvent_(stopProcessingEvent){
32 
33  for (auto& workerInPath : workers_) {
34  workerInPath.setPathContext(&pathContext_);
35  }
36  }
37 
38  Path::Path(Path const& r) :
39  timesRun_(r.timesRun_),
40  timesPassed_(r.timesPassed_),
41  timesFailed_(r.timesFailed_),
42  timesExcept_(r.timesExcept_),
43  state_(r.state_),
44  bitpos_(r.bitpos_),
45  trptr_(r.trptr_),
46  actReg_(r.actReg_),
47  act_table_(r.act_table_),
48  workers_(r.workers_),
49  earlyDeleteHelpers_(r.earlyDeleteHelpers_),
50  pathContext_(r.pathContext_),
51  stopProcessingEvent_(r.stopProcessingEvent_){
52 
53  for (auto& workerInPath : workers_) {
54  workerInPath.setPathContext(&pathContext_);
55  }
56  }
57 
58 
59  bool
61  int nwrwue,
62  bool isEvent,
63  bool begin,
65  ModuleDescription const& desc,
66  std::string const& id) {
67 
68  exceptionContext(e, isEvent, begin, branchType, desc, id, pathContext_);
69 
70  bool should_continue = true;
71 
72  // there is no support as of yet for specific paths having
73  // different exception behavior
74 
75  // If not processing an event, always rethrow.
77  switch(action) {
79  should_continue = false;
80  edm::printCmsExceptionWarning("FailPath", e);
81  break;
82  }
84  //Need the other Paths to stop as soon as possible
86  *stopProcessingEvent_ = true;
87  }
88  }
89  default: {
90  if (isEvent) ++timesExcept_;
92  recordStatus(nwrwue, isEvent);
93  if (action == exception_actions::Rethrow) {
95  if (e.category() == pNF) {
96  std::ostringstream ost;
97  ost << "If you wish to continue processing events after a " << pNF << " exception,\n" <<
98  "add \"SkipEvent = cms.untracked.vstring('ProductNotFound')\" to the \"options\" PSet in the configuration.\n";
99  e.addAdditionalInfo(ost.str());
100  }
101  }
102  throw e;
103  }
104  }
105 
106  return should_continue;
107  }
108 
109  void
111  bool isEvent,
112  bool begin,
114  ModuleDescription const& desc,
115  std::string const& id,
116  PathContext const& pathContext) {
117  std::ostringstream ost;
118  if (isEvent) {
119  ost << "Calling event method";
120  }
121  else if (begin && branchType == InRun) {
122  ost << "Calling beginRun";
123  }
124  else if (begin && branchType == InLumi) {
125  ost << "Calling beginLuminosityBlock";
126  }
127  else if (!begin && branchType == InLumi) {
128  ost << "Calling endLuminosityBlock";
129  }
130  else if (!begin && branchType == InRun) {
131  ost << "Calling endRun";
132  }
133  else {
134  // It should be impossible to get here ...
135  ost << "Calling unknown function";
136  }
137  ost << " for module " << desc.moduleName() << "/'" << desc.moduleLabel() << "'";
138  ex.addContext(ost.str());
139  ost.str("");
140  ost << "Running path '" << pathContext.pathName() << "'";
141  ex.addContext(ost.str());
142  ost.str("");
143  ost << "Processing ";
144  ost << id;
145  ex.addContext(ost.str());
146  }
147 
148  void
149  Path::recordStatus(int nwrwue, bool isEvent) {
150  if(isEvent && trptr_) {
151  (*trptr_)[bitpos_]=HLTPathStatus(state_, nwrwue);
152  }
153  }
154 
155  void
156  Path::updateCounters(bool success, bool isEvent) {
157  if (success) {
158  if (isEvent) ++timesPassed_;
159  state_ = hlt::Pass;
160  } else {
161  if(isEvent) ++timesFailed_;
162  state_ = hlt::Fail;
163  }
164  }
165 
166  void
168  using std::placeholders::_1;
170  for_all(workers_, std::bind(&WorkerInPath::clearCounters, _1));
171  }
172 
173  void
174  Path::setEarlyDeleteHelpers(std::map<const Worker*,EarlyDeleteHelper*> const& iWorkerToDeleter) {
175  //we use a temp so we can overset the size but then when moving to earlyDeleteHelpers we only
176  // have to use the space necessary
177  std::vector<EarlyDeleteHelper*> temp;
178  temp.reserve(iWorkerToDeleter.size());
179  for(unsigned int index=0; index !=size();++index) {
180  auto found = iWorkerToDeleter.find(getWorker(index));
181  if(found != iWorkerToDeleter.end()) {
182  temp.push_back(found->second);
183  found->second->addedToPath();
184  }
185  }
186  std::vector<EarlyDeleteHelper*> tempCorrectSize(temp.begin(),temp.end());
187  earlyDeleteHelpers_.swap(tempCorrectSize);
188  }
189 
190  void
192  for(auto helper: earlyDeleteHelpers_) {
193  helper->pathFinished(iEvent);
194  }
195  }
196 
197  void
199  EventPrincipal const& iEP,
200  EventSetup const& iES,
201  StreamID const& iStreamID,
202  StreamContext const* iStreamContext) {
204  ++timesRun_;
205  waitingTasks_.add(iTask);
206  if(actReg_) {
207  actReg_->prePathEventSignal_(*iStreamContext, pathContext_);
208  }
209  state_ = hlt::Ready;
210 
211  if(workers_.empty()) {
212  finished(-1, true, std::exception_ptr(), iStreamContext);
213  return;
214  }
215 
216  runNextWorkerAsync(0,iEP,iES,iStreamID, iStreamContext);
217  }
218 
219  void
220  Path::workerFinished(std::exception_ptr const* iException,
221  unsigned int iModuleIndex,
222  EventPrincipal const& iEP, EventSetup const& iES,
223  StreamID const& iID, StreamContext const* iContext) {
224 
225  //This call also allows the WorkerInPath to update statistics
226  // so should be done even if an exception happened
227  auto& worker = workers_[iModuleIndex];
228  bool shouldContinue = worker.checkResultsOfRunWorker(true);
229  std::exception_ptr finalException;
230  if(iException) {
231  std::unique_ptr<cms::Exception> pEx;
232  try {
233  std::rethrow_exception(*iException);
234  } catch(cms::Exception& oldEx) {
235  pEx = std::make_unique<cms::Exception>(oldEx);
236  }
237  try {
238  std::ostringstream ost;
239  ost << iEP.id();
240  shouldContinue = handleWorkerFailure(*pEx, iModuleIndex, /*isEvent*/ true, /*isBegin*/ true, InEvent,
241  worker.getWorker()->description(), ost.str());
242  //If we didn't rethrow, then we effectively skipped
243  worker.skipWorker(iEP);
244  finalException = std::exception_ptr();
245  } catch(...) {
246  shouldContinue = false;
247  finalException = std::current_exception();
248  }
249  }
251  shouldContinue = false;
252  }
253  auto const nextIndex = iModuleIndex +1;
254  if (shouldContinue and nextIndex < workers_.size()) {
255  runNextWorkerAsync(nextIndex, iEP, iES, iID, iContext);
256  return;
257  }
258 
259  if (not shouldContinue) {
260  //we are leaving the path early
261  for(auto it = workers_.begin()+nextIndex, itEnd=workers_.end();
262  it != itEnd; ++it) {
263  it->skipWorker(iEP);
264  }
265  handleEarlyFinish(iEP);
266  }
267  finished(iModuleIndex, shouldContinue, finalException, iContext);
268  }
269 
270  void
271  Path::finished(int iModuleIndex, bool iSucceeded, std::exception_ptr iException, StreamContext const* iContext) {
272 
273  if(not iException) {
274  updateCounters(iSucceeded, true);
275  recordStatus(iModuleIndex, true);
276  }
277  try {
278  HLTPathStatus status(state_, iModuleIndex);
279  actReg_->postPathEventSignal_(*iContext, pathContext_, status);
280  } catch(...) {
281  if(not iException) {
282  iException = std::current_exception();
283  }
284  }
285  waitingTasks_.doneWaiting(iException);
286  }
287 
288  void
289  Path::runNextWorkerAsync(unsigned int iNextModuleIndex,
290  EventPrincipal const& iEP, EventSetup const& iES,
291  StreamID const& iID, StreamContext const* iContext) {
292 
293  //need to make sure Service system is activated on the reading thread
295 
296  auto nextTask = make_waiting_task( tbb::task::allocate_root(),
297  [this, iNextModuleIndex, &iEP,&iES, iID, iContext, token](std::exception_ptr const* iException)
298  {
300  this->workerFinished(iException, iNextModuleIndex, iEP,iES,iID,iContext);
301  });
302 
303  workers_[iNextModuleIndex].runWorkerAsync<
305  iEP,
306  iES,
307  iID,
308  iContext);
309  }
310 
311 }
std::string const & pathName() const
Definition: PathContext.h:37
void recordStatus(int nwrwue, bool isEvent)
Definition: Path.cc:149
void handleEarlyFinish(EventPrincipal const &)
Definition: Path.cc:191
not [yet] run
Definition: HLTenums.h:18
static const std::string & codeToString(Code)
-----------— implementation details ---------------—
Definition: EDMException.cc:50
std::vector< EarlyDeleteHelper * > earlyDeleteHelpers_
Definition: Path.h:107
int timesFailed_
Definition: Path.h:96
std::vector< WorkerInPath > WorkersInPath
Definition: Path.h:46
void add(WaitingTask *)
Adds task to the waiting list.
EventID const & id() const
reject
Definition: HLTenums.h:20
std::string const & moduleName() const
std::string const & category() const
Definition: Exception.cc:183
exception_actions::ActionCodes find(const std::string &category) const
int timesExcept_
Definition: Path.h:97
void runNextWorkerAsync(unsigned int iNextModuleIndex, EventPrincipal const &, EventSetup const &, StreamID const &, StreamContext const *)
Definition: Path.cc:289
PathContext pathContext_
Definition: Path.h:109
size_type size() const
Definition: Path.h:79
WaitingTaskList waitingTasks_
Definition: Path.h:110
void reset()
Resets access to the resource so that added tasks will wait.
actions
Definition: Schedule.cc:383
ServiceToken presentToken() const
std::string const & moduleLabel() const
BranchType
Definition: BranchType.h:11
std::shared_ptr< ActivityRegistry > actReg_
Definition: Path.h:103
Func for_all(ForwardSequence &s, Func f)
wrapper for std::for_each
Definition: Algorithms.h:16
TrigResPtr trptr_
Definition: Path.h:102
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
int iEvent
Definition: GenABIO.cc:230
void workerFinished(std::exception_ptr const *iException, unsigned int iModuleIndex, EventPrincipal const &iEP, EventSetup const &iES, StreamID const &iID, StreamContext const *iContext)
Definition: Path.cc:220
std::shared_ptr< HLTGlobalStatus > TrigResPtr
Definition: Path.h:48
accept
Definition: HLTenums.h:19
void processOneOccurrenceAsync(WaitingTask *, EventPrincipal const &, EventSetup const &, StreamID const &, StreamContext const *)
Definition: Path.cc:198
int bitpos_
Definition: Path.h:101
Definition: Path.h:42
static ServiceRegistry & instance()
void addAdditionalInfo(std::string const &info)
Definition: Exception.cc:235
areg
Definition: Schedule.cc:383
WorkersInPath workers_
Definition: Path.h:106
static void exceptionContext(cms::Exception &ex, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id, PathContext const &)
Definition: Path.cc:110
void updateCounters(bool succeed, bool isEvent)
Definition: Path.cc:156
Path(int bitpos, std::string const &path_name, WorkersInPath const &workers, TrigResPtr trptr, ExceptionToActionTable const &actions, std::shared_ptr< ActivityRegistry > reg, StreamContext const *streamContext, std::atomic< bool > *stopProcessEvent, PathContext::PathType pathType)
Definition: Path.cc:12
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:88
void addContext(std::string const &context)
Definition: Exception.cc:227
bool handleWorkerFailure(cms::Exception &e, int nwrwue, bool isEvent, bool begin, BranchType branchType, ModuleDescription const &, std::string const &id)
Definition: Path.cc:60
void setEarlyDeleteHelpers(std::map< const Worker *, EarlyDeleteHelper * > const &)
Definition: Path.cc:174
string action
Definition: mps_fire.py:28
ExceptionToActionTable const * act_table_
Definition: Path.h:104
State state_
Definition: Path.h:99
#define begin
Definition: vmac.h:30
Worker const * getWorker(size_type i) const
Definition: Path.h:84
void finished(int iModuleIndex, bool iSucceeded, std::exception_ptr, StreamContext const *)
Definition: Path.cc:271
int timesPassed_
Definition: Path.h:95
std::atomic< bool > * stopProcessingEvent_
Definition: Path.h:111
int timesRun_
Definition: Path.h:94
void clearCounters()
Definition: Path.cc:167
tuple status
Definition: mps_update.py:57
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)