CMS 3D CMS Logo

Worker.cc
Go to the documentation of this file.
1 
2 /*----------------------------------------------------------------------
3 ----------------------------------------------------------------------*/
4 
10 
11 namespace edm {
12  namespace {
13  class ModuleBeginJobSignalSentry {
14  public:
15  ModuleBeginJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md) : a_(a), md_(&md) {
16  if (a_)
17  a_->preModuleBeginJobSignal_(*md_);
18  }
19  ~ModuleBeginJobSignalSentry() {
20  if (a_)
21  a_->postModuleBeginJobSignal_(*md_);
22  }
23 
24  private:
25  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
26  ModuleDescription const* md_;
27  };
28 
29  class ModuleEndJobSignalSentry {
30  public:
31  ModuleEndJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md) : a_(a), md_(&md) {
32  if (a_)
33  a_->preModuleEndJobSignal_(*md_);
34  }
35  ~ModuleEndJobSignalSentry() {
36  if (a_)
37  a_->postModuleEndJobSignal_(*md_);
38  }
39 
40  private:
41  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
42  ModuleDescription const* md_;
43  };
44 
45  class ModuleBeginStreamSignalSentry {
46  public:
47  ModuleBeginStreamSignalSentry(ActivityRegistry* a, StreamContext const& sc, ModuleCallingContext const& mcc)
48  : a_(a), sc_(sc), mcc_(mcc) {
49  if (a_)
50  a_->preModuleBeginStreamSignal_(sc_, mcc_);
51  }
52  ~ModuleBeginStreamSignalSentry() {
53  if (a_)
54  a_->postModuleBeginStreamSignal_(sc_, mcc_);
55  }
56 
57  private:
58  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
59  StreamContext const& sc_;
60  ModuleCallingContext const& mcc_;
61  };
62 
63  class ModuleEndStreamSignalSentry {
64  public:
65  ModuleEndStreamSignalSentry(ActivityRegistry* a, StreamContext const& sc, ModuleCallingContext const& mcc)
66  : a_(a), sc_(sc), mcc_(mcc) {
67  if (a_)
68  a_->preModuleEndStreamSignal_(sc_, mcc_);
69  }
70  ~ModuleEndStreamSignalSentry() {
71  if (a_)
72  a_->postModuleEndStreamSignal_(sc_, mcc_);
73  }
74 
75  private:
76  ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
77  StreamContext const& sc_;
78  ModuleCallingContext const& mcc_;
79  };
80 
81  } // namespace
82 
84  : timesRun_(0),
85  timesVisited_(0),
86  timesPassed_(0),
87  timesFailed_(0),
88  timesExcept_(0),
89  state_(Ready),
90  numberOfPathsOn_(0),
91  numberOfPathsLeftToRun_(0),
92  moduleCallingContext_(&iMD),
93  actions_(iActions),
94  cached_exception_(),
95  actReg_(),
96  earlyDeleteHelper_(nullptr),
97  workStarted_(false),
98  ranAcquireWithoutException_(false) {}
99 
101 
102  void Worker::setActivityRegistry(std::shared_ptr<ActivityRegistry> areg) { actReg_ = areg; }
103 
105  ModuleCallingContext const* imcc = mcc;
106  while ((imcc->type() == ParentContext::Type::kModule) or (imcc->type() == ParentContext::Type::kInternal)) {
107  std::ostringstream iost;
109  iost << "Prefetching for module ";
110  } else {
111  iost << "Calling method for module ";
112  }
113  iost << imcc->moduleDescription()->moduleName() << "/'" << imcc->moduleDescription()->moduleLabel() << "'";
114 
115  if (imcc->type() == ParentContext::Type::kInternal) {
116  iost << " (probably inside some kind of mixing module)";
117  imcc = imcc->internalContext()->moduleCallingContext();
118  } else {
119  imcc = imcc->moduleCallingContext();
120  }
121  ex.addContext(iost.str());
122  }
123  std::ostringstream ost;
125  ost << "Prefetching for module ";
126  } else {
127  ost << "Calling method for module ";
128  }
129  ost << imcc->moduleDescription()->moduleName() << "/'" << imcc->moduleDescription()->moduleLabel() << "'";
130  ex.addContext(ost.str());
131 
132  if (imcc->type() == ParentContext::Type::kPlaceInPath) {
133  ost.str("");
134  ost << "Running path '";
135  ost << imcc->placeInPathContext()->pathContext()->pathName() << "'";
136  ex.addContext(ost.str());
137  auto streamContext = imcc->placeInPathContext()->pathContext()->streamContext();
138  if (streamContext) {
139  ost.str("");
140  edm::exceptionContext(ost, *streamContext);
141  ex.addContext(ost.str());
142  }
143  } else {
144  if (imcc->type() == ParentContext::Type::kStream) {
145  ost.str("");
146  edm::exceptionContext(ost, *(imcc->streamContext()));
147  ex.addContext(ost.str());
148  } else if (imcc->type() == ParentContext::Type::kGlobal) {
149  ost.str("");
150  edm::exceptionContext(ost, *(imcc->globalContext()));
151  ex.addContext(ost.str());
152  }
153  }
154  }
155 
156  bool Worker::shouldRethrowException(std::exception_ptr iPtr,
157  ParentContext const& parentContext,
158  bool isEvent,
159  TransitionIDValueBase const& iID) const {
160  // NOTE: the warning printed as a result of ignoring or failing
161  // a module will only be printed during the full true processing
162  // pass of this module
163 
164  // Get the action corresponding to this exception. However, if processing
165  // something other than an event (e.g. run, lumi) always rethrow.
166  if (not isEvent) {
167  return true;
168  }
169  try {
170  convertException::wrap([&]() { std::rethrow_exception(iPtr); });
171  } catch (cms::Exception& ex) {
173 
174  if (action == exception_actions::Rethrow) {
175  return true;
176  }
177 
178  ModuleCallingContext tempContext(&description(), ModuleCallingContext::State::kInvalid, parentContext, nullptr);
179 
180  // If we are processing an endpath and the module was scheduled, treat SkipEvent or FailPath
181  // as IgnoreCompletely, so any subsequent OutputModules are still run.
182  // For unscheduled modules only treat FailPath as IgnoreCompletely but still allow SkipEvent to throw
183  ModuleCallingContext const* top_mcc = tempContext.getTopModuleCallingContext();
184  if (top_mcc->type() == ParentContext::Type::kPlaceInPath &&
185  top_mcc->placeInPathContext()->pathContext()->isEndPath()) {
186  if ((action == exception_actions::SkipEvent && tempContext.type() == ParentContext::Type::kPlaceInPath) ||
187  action == exception_actions::FailPath) {
189  }
190  }
191  if (action == exception_actions::IgnoreCompletely) {
192  edm::printCmsExceptionWarning("IgnoreCompletely", ex);
193  return false;
194  }
195  }
196  return true;
197  }
198 
200  ServiceToken const& token,
201  ParentContext const& parentContext,
202  Principal const& iPrincipal) {
203  // Prefetch products the module declares it consumes (not including the products it maybe consumes)
204  std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFrom(iPrincipal.branchType());
205 
207 
208  if (iPrincipal.branchType() == InEvent) {
209  actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
210  }
211 
212  //Need to be sure the ref count isn't set to 0 immediately
213  iTask->increment_ref_count();
214  for (auto const& item : items) {
215  ProductResolverIndex productResolverIndex = item.productResolverIndex();
216  bool skipCurrentProcess = item.skipCurrentProcess();
217  if (productResolverIndex != ProductResolverIndexAmbiguous) {
218  iPrincipal.prefetchAsync(iTask, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
219  }
220  }
221 
222  if (iPrincipal.branchType() == InEvent) {
224  }
225 
226  if (0 == iTask->decrement_ref_count()) {
227  //if everything finishes before we leave this routine, we need to launch the task
228  tbb::task::spawn(*iTask);
229  }
230  }
231 
233  ServiceToken const& token,
234  StreamID id,
235  EventPrincipal const* iPrincipal) {
236  successTask->increment_ref_count();
237 
238  auto choiceTask = edm::make_waiting_task(
239  tbb::task::allocate_root(), [id, successTask, iPrincipal, this, token](std::exception_ptr const*) {
240  ServiceRegistry::Operate guard(token);
241  try {
242  if (not implDoPrePrefetchSelection(id, *iPrincipal, &moduleCallingContext_)) {
243  timesRun_.fetch_add(1, std::memory_order_relaxed);
244  setPassed<true>();
245  waitingTasks_.doneWaiting(nullptr);
246  //TBB requires that destroyed tasks have count 0
247  if (0 == successTask->decrement_ref_count()) {
248  tbb::task::destroy(*successTask);
249  }
250  return;
251  }
252  } catch (...) {
253  }
254  if (0 == successTask->decrement_ref_count()) {
255  tbb::task::spawn(*successTask);
256  }
257  });
258 
259  WaitingTaskHolder choiceHolder{choiceTask};
260 
261  std::vector<ProductResolverIndexAndSkipBit> items;
262  itemsToGetForSelection(items);
263 
264  for (auto const& item : items) {
265  ProductResolverIndex productResolverIndex = item.productResolverIndex();
266  bool skipCurrentProcess = item.skipCurrentProcess();
267  if (productResolverIndex != ProductResolverIndexAmbiguous) {
268  iPrincipal->prefetchAsync(choiceTask, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
269  }
270  }
271  choiceHolder.doneWaiting(std::exception_ptr{});
272  }
273 
275 
282  }
283 
285  try {
286  convertException::wrap([&]() {
287  ModuleBeginJobSignalSentry cpp(actReg_.get(), description());
288  implBeginJob();
289  });
290  } catch (cms::Exception& ex) {
291  state_ = Exception;
292  std::ostringstream ost;
293  ost << "Calling beginJob for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
294  ex.addContext(ost.str());
295  throw;
296  }
297  }
298 
299  void Worker::endJob() {
300  try {
301  convertException::wrap([&]() {
302  ModuleEndJobSignalSentry cpp(actReg_.get(), description());
303  implEndJob();
304  });
305  } catch (cms::Exception& ex) {
306  state_ = Exception;
307  std::ostringstream ost;
308  ost << "Calling endJob for module " << description().moduleName() << "/'" << description().moduleLabel() << "'";
309  ex.addContext(ost.str());
310  throw;
311  }
312  }
313 
314  void Worker::beginStream(StreamID id, StreamContext& streamContext) {
315  try {
316  convertException::wrap([&]() {
318  streamContext.setEventID(EventID(0, 0, 0));
319  streamContext.setRunIndex(RunIndex::invalidRunIndex());
321  streamContext.setTimestamp(Timestamp());
322  ParentContext parentContext(&streamContext);
323  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
325  ModuleBeginStreamSignalSentry beginSentry(actReg_.get(), streamContext, moduleCallingContext_);
326  implBeginStream(id);
327  });
328  } catch (cms::Exception& ex) {
329  state_ = Exception;
330  std::ostringstream ost;
331  ost << "Calling beginStream for module " << description().moduleName() << "/'" << description().moduleLabel()
332  << "'";
333  ex.addContext(ost.str());
334  throw;
335  }
336  }
337 
338  void Worker::endStream(StreamID id, StreamContext& streamContext) {
339  try {
340  convertException::wrap([&]() {
342  streamContext.setEventID(EventID(0, 0, 0));
343  streamContext.setRunIndex(RunIndex::invalidRunIndex());
345  streamContext.setTimestamp(Timestamp());
346  ParentContext parentContext(&streamContext);
347  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
349  ModuleEndStreamSignalSentry endSentry(actReg_.get(), streamContext, moduleCallingContext_);
350  implEndStream(id);
351  });
352  } catch (cms::Exception& ex) {
353  state_ = Exception;
354  std::ostringstream ost;
355  ost << "Calling endStream for module " << description().moduleName() << "/'" << description().moduleLabel()
356  << "'";
357  ex.addContext(ost.str());
358  throw;
359  }
360  }
361 
363  if (0 == --numberOfPathsLeftToRun_) {
365  }
366  }
367 
369  if (earlyDeleteHelper_) {
370  earlyDeleteHelper_->moduleRan(iEvent);
371  }
372  }
373 
375  EventSetupImpl const& es,
376  ParentContext const& parentContext,
377  WaitingTaskWithArenaHolder& holder) {
378  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
379  try {
380  convertException::wrap([&]() { this->implDoAcquire(ep, es, &moduleCallingContext_, holder); });
381  } catch (cms::Exception& ex) {
384  if (shouldRethrowException(std::current_exception(), parentContext, true, idValue)) {
385  timesRun_.fetch_add(1, std::memory_order_relaxed);
386  throw;
387  }
388  }
389  }
390 
391  void Worker::runAcquireAfterAsyncPrefetch(std::exception_ptr const* iEPtr,
392  EventPrincipal const& ep,
393  EventSetupImpl const& es,
394  ParentContext const& parentContext,
397  std::exception_ptr exceptionPtr;
398  if (iEPtr) {
399  assert(*iEPtr);
401  if (shouldRethrowException(*iEPtr, parentContext, true, idValue)) {
402  exceptionPtr = *iEPtr;
403  }
405  } else {
406  try {
407  runAcquire(ep, es, parentContext, holder);
409  } catch (...) {
410  exceptionPtr = std::current_exception();
411  }
412  }
413  // It is important this is after runAcquire completely finishes
414  holder.doneWaiting(exceptionPtr);
415  }
416 
417  std::exception_ptr Worker::handleExternalWorkException(std::exception_ptr const* iEPtr,
418  ParentContext const& parentContext) {
420  try {
421  convertException::wrap([iEPtr]() { std::rethrow_exception(*iEPtr); });
422  } catch (cms::Exception& ex) {
423  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
425  return std::current_exception();
426  }
427  }
428  return *iEPtr;
429  }
430 
432  WaitingTask* runModuleTask,
433  ParentContext const& parentContext)
434  : m_worker(worker), m_runModuleTask(runModuleTask), m_parentContext(parentContext) {}
435 
437  auto excptr = exceptionPtr();
438  if (excptr) {
439  // increment the ref count so the holder will not spawn it
440  m_runModuleTask->set_ref_count(1);
443  }
444  m_runModuleTask->set_ref_count(0);
445  // Depend on TBB Scheduler Bypass to run the next task
446  return m_runModuleTask;
447  }
448 } // namespace edm
std::string const & pathName() const
Definition: PathContext.h:30
GlobalContext const * globalContext() const
void resetModuleDescription(ModuleDescription const *)
Definition: Worker.cc:276
void skipOnPath()
Definition: Worker.cc:362
void setTimestamp(Timestamp const &v)
Definition: StreamContext.h:69
ModuleDescription const & description() const
Definition: Worker.h:190
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:607
unsigned int ProductResolverIndex
StreamContext const * getStreamContext() const
ModuleCallingContext const * moduleCallingContext() const
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
Definition: Worker.cc:274
ModuleCallingContext const * getTopModuleCallingContext() const
static void exceptionContext(cms::Exception &ex, ModuleCallingContext const *mcc)
Definition: Worker.cc:104
virtual ~Worker()
Definition: Worker.cc:100
void endJob()
Definition: Worker.cc:299
void runAcquireAfterAsyncPrefetch(std::exception_ptr const *iEPtr, EventPrincipal const &ep, EventSetupImpl const &es, ParentContext const &parentContext, WaitingTaskWithArenaHolder holder)
Definition: Worker.cc:391
InternalContext const * internalContext() const
#define nullptr
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:614
std::string const & moduleName() const
std::string const & category() const
Definition: Exception.cc:143
exception_actions::ActionCodes find(const std::string &category) const
ModuleCallingContext const * moduleCallingContext() const
void beginStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:314
void exceptionContext(std::ostream &, GlobalContext const &)
ExceptionToActionTable const * actions_
Definition: Worker.h:611
std::string const & moduleLabel() const
void setContext(State state, ParentContext const &parent, ModuleCallingContext const *previousOnThread)
bool ranAcquireWithoutException_
Definition: Worker.h:620
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
Definition: Worker.cc:102
ParentContext const & parent() const
void prefetchAsync(WaitingTask *waitTask, ProductResolverIndex index, bool skipCurrentProcess, ServiceToken const &token, ModuleCallingContext const *mcc) const
Definition: Principal.cc:606
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:609
virtual void itemsToGetForSelection(std::vector< ProductResolverIndexAndSkipBit > &) const =0
void setTransition(Transition v)
Definition: StreamContext.h:65
std::exception_ptr cached_exception_
Definition: Worker.h:612
static RunIndex invalidRunIndex()
Definition: RunIndex.cc:9
bool isEndPath() const
Definition: PathContext.h:35
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
int iEvent
Definition: GenABIO.cc:224
void doneWaiting(std::exception_ptr iExcept)
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, TransitionIDValueBase const &iID) const
Definition: Worker.cc:156
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:618
void doneWaiting(std::exception_ptr iExcept)
HandleExternalWorkExceptionTask(Worker *worker, WaitingTask *runModuleTask, ParentContext const &parentContext)
Definition: Worker.cc:431
void setLuminosityBlockIndex(LuminosityBlockIndex const &v)
Definition: StreamContext.h:68
BranchType const & branchType() const
Definition: Principal.h:180
PathContext const * pathContext() const
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
ModuleDescription const * moduleDescription() const
virtual void implDoAcquire(EventPrincipal const &, EventSetupImpl const &c, ModuleCallingContext const *mcc, WaitingTaskWithArenaHolder &holder)=0
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
Definition: Worker.cc:83
std::atomic< State > state_
Definition: Worker.h:605
void prefetchAsync(WaitingTask *, ServiceToken const &, ParentContext const &parentContext, Principal const &)
Definition: Worker.cc:199
virtual void implEndJob()=0
StreamContext const * streamContext() const
static LuminosityBlockIndex invalidLuminosityBlockIndex()
virtual bool implDoPrePrefetchSelection(StreamID id, EventPrincipal const &ep, ModuleCallingContext const *mcc)=0
void prePrefetchSelectionAsync(WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *)
Definition: Worker.cc:232
virtual void implBeginJob()=0
FunctorWaitingTask< F > * make_waiting_task(ALLOC &&iAlloc, F f)
Definition: WaitingTask.h:87
void addContext(std::string const &context)
Definition: Exception.cc:165
virtual void implBeginStream(StreamID)=0
virtual std::vector< ProductResolverIndexAndSkipBit > const & itemsToGetFrom(BranchType) const =0
virtual void implEndStream(StreamID)=0
StreamContext const * streamContext() const
Definition: PathContext.h:31
void setEventID(EventID const &v)
Definition: StreamContext.h:66
ModuleCallingContext const * previousModuleOnThread() const
HLT enums.
void runAcquire(EventPrincipal const &ep, EventSetupImpl const &es, ParentContext const &parentContext, WaitingTaskWithArenaHolder &holder)
Definition: Worker.cc:374
double a
Definition: hdecay.h:119
void beginJob()
Definition: Worker.cc:284
void endStream(StreamID id, StreamContext &streamContext)
Definition: Worker.cc:338
void postDoEvent(EventPrincipal const &)
Definition: Worker.cc:368
auto wrap(F iFunc) -> decltype(iFunc())
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:616
virtual void preActionBeforeRunEventAsync(WaitingTask *iTask, ModuleCallingContext const &moduleCallingContext, Principal const &iPrincipal) const =0
std::atomic< int > timesRun_
Definition: Worker.h:600
std::exception_ptr const * exceptionPtr() const
Returns exception thrown by dependent task.
Definition: WaitingTask.h:51
PlaceInPathContext const * placeInPathContext() const
void setRunIndex(RunIndex const &v)
Definition: StreamContext.h:67
std::exception_ptr handleExternalWorkException(std::exception_ptr const *iEPtr, ParentContext const &parentContext)
Definition: Worker.cc:417
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)