CMS 3D CMS Logo

Worker.cc
Go to the documentation of this file.
1 
2 /*----------------------------------------------------------------------
3 ----------------------------------------------------------------------*/
17 
18 namespace edm {
19  namespace {
20  class ModuleBeginJobTraits {
21  public:
22  using Context = GlobalContext;
23  static void preModuleSignal(ActivityRegistry* activityRegistry,
24  GlobalContext const*,
25  ModuleCallingContext const* moduleCallingContext) {
26  activityRegistry->preModuleBeginJobSignal_(*moduleCallingContext->moduleDescription());
27  }
28  static void postModuleSignal(ActivityRegistry* activityRegistry,
29  GlobalContext const*,
30  ModuleCallingContext const* moduleCallingContext) {
31  activityRegistry->postModuleBeginJobSignal_(*moduleCallingContext->moduleDescription());
32  }
33  };
34 
35  class ModuleEndJobTraits {
36  public:
37  using Context = GlobalContext;
38  static void preModuleSignal(ActivityRegistry* activityRegistry,
39  GlobalContext const*,
40  ModuleCallingContext const* moduleCallingContext) {
41  activityRegistry->preModuleEndJobSignal_(*moduleCallingContext->moduleDescription());
42  }
43  static void postModuleSignal(ActivityRegistry* activityRegistry,
44  GlobalContext const*,
45  ModuleCallingContext const* moduleCallingContext) {
46  activityRegistry->postModuleEndJobSignal_(*moduleCallingContext->moduleDescription());
47  }
48  };
49 
50  class ModuleBeginStreamTraits {
51  public:
52  using Context = StreamContext;
53  static void preModuleSignal(ActivityRegistry* activityRegistry,
54  StreamContext const* streamContext,
55  ModuleCallingContext const* moduleCallingContext) {
56  activityRegistry->preModuleBeginStreamSignal_(*streamContext, *moduleCallingContext);
57  }
58  static void postModuleSignal(ActivityRegistry* activityRegistry,
59  StreamContext const* streamContext,
60  ModuleCallingContext const* moduleCallingContext) {
61  activityRegistry->postModuleBeginStreamSignal_(*streamContext, *moduleCallingContext);
62  }
63  };
64 
65  class ModuleEndStreamTraits {
66  public:
67  using Context = StreamContext;
68  static void preModuleSignal(ActivityRegistry* activityRegistry,
69  StreamContext const* streamContext,
70  ModuleCallingContext const* moduleCallingContext) {
71  activityRegistry->preModuleEndStreamSignal_(*streamContext, *moduleCallingContext);
72  }
73  static void postModuleSignal(ActivityRegistry* activityRegistry,
74  StreamContext const* streamContext,
75  ModuleCallingContext const* moduleCallingContext) {
76  activityRegistry->postModuleEndStreamSignal_(*streamContext, *moduleCallingContext);
77  }
78  };
79 
80  } // namespace
81 
83  : timesRun_(0),
84  timesVisited_(0),
85  timesPassed_(0),
86  timesFailed_(0),
87  timesExcept_(0),
88  state_(Ready),
89  numberOfPathsOn_(0),
90  numberOfPathsLeftToRun_(0),
91  moduleCallingContext_(&iMD),
92  actions_(iActions),
93  cached_exception_(),
94  actReg_(),
95  earlyDeleteHelper_(nullptr),
96  workStarted_(false),
97  ranAcquireWithoutException_(false) {
99  }
100 
102 
103  void Worker::setActivityRegistry(std::shared_ptr<ActivityRegistry> areg) { actReg_ = areg; }
104 
107  if (pset and pset->exists("@shouldTryToContinue")) {
108  shouldTryToContinue_ = true;
109  }
110  }
111 
112  bool Worker::shouldRethrowException(std::exception_ptr iPtr,
113  ParentContext const& parentContext,
114  bool isEvent,
115  bool shouldTryToContinue) const noexcept {
116  // NOTE: the warning printed as a result of ignoring or failing
117  // a module will only be printed during the full true processing
118  // pass of this module
119 
120  // Get the action corresponding to this exception. However, if processing
121  // something other than an event (e.g. run, lumi) always rethrow.
122  if (not isEvent) {
123  return true;
124  }
125  try {
126  convertException::wrap([&]() { std::rethrow_exception(iPtr); });
127  } catch (cms::Exception& ex) {
128  exception_actions::ActionCodes action = actions_->find(ex.category());
129 
131  return true;
132  }
134  if (shouldTryToContinue) {
135  edm::printCmsExceptionWarning("TryToContinue", ex);
136  }
137  return not shouldTryToContinue;
138  }
140  edm::printCmsExceptionWarning("IgnoreCompletely", ex);
141  return false;
142  }
143  }
144  return true;
145  }
146 
147  void Worker::prePrefetchSelectionAsync(oneapi::tbb::task_group& group,
148  WaitingTask* successTask,
149  ServiceToken const& token,
150  StreamID id,
151  EventPrincipal const* iPrincipal) noexcept {
152  successTask->increment_ref_count();
153 
154  ServiceWeakToken weakToken = token;
155  auto choiceTask =
156  edm::make_waiting_task([id, successTask, iPrincipal, this, weakToken, &group](std::exception_ptr const*) {
157  ServiceRegistry::Operate guard(weakToken.lock());
158  try {
159  bool selected = convertException::wrap([&]() {
160  if (not implDoPrePrefetchSelection(id, *iPrincipal, &moduleCallingContext_)) {
161  timesRun_.fetch_add(1, std::memory_order_relaxed);
162  setPassed<true>();
163  waitingTasks_.doneWaiting(nullptr);
164  //TBB requires that destroyed tasks have count 0
165  if (0 == successTask->decrement_ref_count()) {
166  TaskSentry s(successTask);
167  }
168  return false;
169  }
170  return true;
171  });
172  if (not selected) {
173  return;
174  }
175 
176  } catch (cms::Exception& e) {
177  edm::exceptionContext(e, moduleCallingContext_);
178  setException<true>(std::current_exception());
179  waitingTasks_.doneWaiting(std::current_exception());
180  //TBB requires that destroyed tasks have count 0
181  if (0 == successTask->decrement_ref_count()) {
182  TaskSentry s(successTask);
183  }
184  return;
185  }
186  if (0 == successTask->decrement_ref_count()) {
187  group.run([successTask]() {
188  TaskSentry s(successTask);
189  successTask->execute();
190  });
191  }
192  });
193 
194  WaitingTaskHolder choiceHolder{group, choiceTask};
195 
196  std::vector<ProductResolverIndexAndSkipBit> items;
197  itemsToGetForSelection(items);
198 
199  for (auto const& item : items) {
200  ProductResolverIndex productResolverIndex = item.productResolverIndex();
201  bool skipCurrentProcess = item.skipCurrentProcess();
202  if (productResolverIndex != ProductResolverIndexAmbiguous and
203  productResolverIndex != ProductResolverIndexInvalid) {
204  iPrincipal->prefetchAsync(
205  choiceHolder, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
206  }
207  }
208  choiceHolder.doneWaiting(std::exception_ptr{});
209  }
210 
212  EventSetupImpl const& iImpl,
213  Transition iTrans,
214  ServiceToken const& iToken) noexcept {
216  return;
217  }
218  auto const& recs = esRecordsToGetFrom(iTrans);
219  auto const& items = esItemsToGetFrom(iTrans);
220 
221  assert(items.size() == recs.size());
222  if (items.empty()) {
223  return;
224  }
225 
226  for (size_t i = 0; i != items.size(); ++i) {
227  if (recs[i] != ESRecordIndex{}) {
228  auto rec = iImpl.findImpl(recs[i]);
229  if (rec) {
230  rec->prefetchAsync(iTask, items[i], &iImpl, iToken, ESParentContext(&moduleCallingContext_));
231  }
232  }
233  }
234  }
235 
237  ServiceToken const& token,
238  Principal const& iPrincipal) const noexcept {
239  // Prefetch products the module declares it consumes
240  std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFrom(iPrincipal.branchType());
241 
242  for (auto const& item : items) {
243  ProductResolverIndex productResolverIndex = item.productResolverIndex();
244  bool skipCurrentProcess = item.skipCurrentProcess();
245  if (productResolverIndex != ProductResolverIndexAmbiguous) {
246  iPrincipal.prefetchAsync(iTask, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
247  }
248  }
249  }
250 
252 
253  size_t Worker::transformIndex(edm::BranchDescription const&) const noexcept { return -1; }
255  size_t iTransformIndex,
256  EventPrincipal const& iPrincipal,
257  ServiceToken const& iToken,
258  StreamID,
259  ModuleCallingContext const& mcc,
260  StreamContext const*) noexcept {
261  ServiceWeakToken weakToken = iToken;
262 
263  //Need to make the services available early so other services can see them
264  auto task = make_waiting_task(
265  [this, iTask, weakToken, &iPrincipal, iTransformIndex, mcc](std::exception_ptr const* iExcept) mutable {
266  //post prefetch signal
267  actReg_->postModuleTransformPrefetchingSignal_.emit(*mcc.getStreamContext(), mcc);
268  if (iExcept) {
269  iTask.doneWaiting(*iExcept);
270  return;
271  }
272  implDoTransformAsync(iTask, iTransformIndex, iPrincipal, mcc.parent(), weakToken);
273  });
274 
275  //pre prefetch signal
276  actReg_->preModuleTransformPrefetchingSignal_.emit(*mcc.getStreamContext(), mcc);
277  iPrincipal.prefetchAsync(
278  WaitingTaskHolder(*iTask.group(), task), itemToGetForTransform(iTransformIndex), false, iToken, &mcc);
279  }
280 
283  0,
288  assert(iDesc);
290  }
291 
292  void Worker::beginJob(GlobalContext const& globalContext) {
293  ParentContext parentContext(&globalContext);
294  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
295  ModuleSignalSentry<ModuleBeginJobTraits> sentry(activityRegistry(), &globalContext, &moduleCallingContext_);
296 
297  try {
298  convertException::wrap([this, &sentry]() {
299  beginSucceeded_ = false;
300  sentry.preModuleSignal();
301  implBeginJob();
302  sentry.postModuleSignal();
303  beginSucceeded_ = true;
304  });
305  } catch (cms::Exception& ex) {
307  throw;
308  }
309  }
310 
311  void Worker::endJob(GlobalContext const& globalContext) {
312  if (beginSucceeded_) {
313  beginSucceeded_ = false;
314 
315  ParentContext parentContext(&globalContext);
316  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
317  ModuleSignalSentry<ModuleEndJobTraits> sentry(activityRegistry(), &globalContext, &moduleCallingContext_);
318 
319  try {
320  convertException::wrap([this, &sentry]() {
321  sentry.preModuleSignal();
322  implEndJob();
323  sentry.postModuleSignal();
324  });
325  } catch (cms::Exception& ex) {
327  throw;
328  }
329  }
330  }
331 
332  void Worker::beginStream(StreamID streamID, StreamContext const& streamContext) {
333  ParentContext parentContext(&streamContext);
334  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
335  ModuleSignalSentry<ModuleBeginStreamTraits> sentry(activityRegistry(), &streamContext, &moduleCallingContext_);
336 
337  try {
338  convertException::wrap([this, &sentry, streamID]() {
339  beginSucceeded_ = false;
340  sentry.preModuleSignal();
341  implBeginStream(streamID);
342  sentry.postModuleSignal();
343  beginSucceeded_ = true;
344  });
345  } catch (cms::Exception& ex) {
347  throw;
348  }
349  }
350 
351  void Worker::endStream(StreamID id, StreamContext const& streamContext) {
352  if (beginSucceeded_) {
353  beginSucceeded_ = false;
354 
355  ParentContext parentContext(&streamContext);
356  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
357  ModuleSignalSentry<ModuleEndStreamTraits> sentry(activityRegistry(), &streamContext, &moduleCallingContext_);
358 
359  try {
360  convertException::wrap([this, &sentry, id]() {
361  sentry.preModuleSignal();
362  implEndStream(id);
363  sentry.postModuleSignal();
364  });
365  } catch (cms::Exception& ex) {
367  throw;
368  }
369  }
370  }
371 
373  try {
375  } catch (cms::Exception& ex) {
376  ex.addContext("Calling registerThinnedAssociations() for module " + description()->moduleLabel());
377  throw ex;
378  }
379  }
380 
382  if (earlyDeleteHelper_) {
383  earlyDeleteHelper_->pathFinished(iEvent);
384  }
385  if (0 == --numberOfPathsLeftToRun_) {
387  }
388  }
389 
391  if (earlyDeleteHelper_) {
392  earlyDeleteHelper_->moduleRan(iEvent);
393  }
394  }
395 
397  ParentContext const& parentContext,
398  WaitingTaskWithArenaHolder& holder) {
399  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
400  try {
401  convertException::wrap([&]() { this->implDoAcquire(info, &moduleCallingContext_, holder); });
402  } catch (cms::Exception& ex) {
404  if (shouldRethrowException(std::current_exception(), parentContext, true, shouldTryToContinue_)) {
405  timesRun_.fetch_add(1, std::memory_order_relaxed);
406  throw;
407  }
408  }
409  }
410 
411  void Worker::runAcquireAfterAsyncPrefetch(std::exception_ptr iEPtr,
412  EventTransitionInfo const& eventTransitionInfo,
413  ParentContext const& parentContext,
414  WaitingTaskWithArenaHolder holder) noexcept {
415  ranAcquireWithoutException_ = false;
416  std::exception_ptr exceptionPtr;
417  if (iEPtr) {
418  if (shouldRethrowException(iEPtr, parentContext, true, shouldTryToContinue_)) {
419  exceptionPtr = iEPtr;
420  }
421  moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
422  } else {
423  // Caught exception is propagated via WaitingTaskWithArenaHolder
424  CMS_SA_ALLOW try {
425  runAcquire(eventTransitionInfo, parentContext, holder);
426  ranAcquireWithoutException_ = true;
427  } catch (...) {
428  exceptionPtr = std::current_exception();
429  }
430  }
431  // It is important this is after runAcquire completely finishes
432  holder.doneWaiting(exceptionPtr);
433  }
434 
435  std::exception_ptr Worker::handleExternalWorkException(std::exception_ptr iEPtr,
436  ParentContext const& parentContext) noexcept {
437  if (ranAcquireWithoutException_) {
438  try {
439  convertException::wrap([iEPtr]() { std::rethrow_exception(iEPtr); });
440  } catch (cms::Exception& ex) {
441  ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
442  edm::exceptionContext(ex, moduleCallingContext_);
443  return std::current_exception();
444  }
445  }
446  return iEPtr;
447  }
448 
450  oneapi::tbb::task_group* group,
451  WaitingTask* runModuleTask,
452  ParentContext const& parentContext) noexcept
453  : m_worker(worker), m_runModuleTask(runModuleTask), m_group(group), m_parentContext(parentContext) {}
454 
456  auto excptr = exceptionPtr();
458  if (excptr) {
460  }
461  }
462 } // namespace edm
bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const &parentContext, bool isEvent, bool isTryToContinue) const noexcept
Definition: Worker.cc:112
virtual void implDoAcquire(EventTransitionInfo const &, ModuleCallingContext const *, WaitingTaskWithArenaHolder &)=0
void resetModuleDescription(ModuleDescription const *)
Definition: Worker.cc:281
static const TGPicture * info(bool iBackgroundIsBlack)
#define CMS_SA_ALLOW
Definition: helper.py:1
std::atomic< int > numberOfPathsLeftToRun_
Definition: Worker.h:614
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
bool getMapped(key_type const &k, value_type &result) const
Definition: Registry.cc:17
unsigned int ProductResolverIndex
void setEarlyDeleteHelper(EarlyDeleteHelper *iHelper)
Definition: Worker.cc:251
std::exception_ptr exceptionPtr() const noexcept
Returns exception thrown by dependent task.
Definition: WaitingTask.h:51
virtual ~Worker()
Definition: Worker.cc:101
ModuleCallingContext const * previousModuleOnThread() const noexcept
void doTransformAsync(WaitingTaskHolder, size_t iTransformIndex, EventPrincipal const &, ServiceToken const &, StreamID, ModuleCallingContext const &, StreamContext const *) noexcept
Definition: Worker.cc:254
std::shared_ptr< ActivityRegistry > actReg_
Definition: Worker.h:621
void beginStream(StreamID, StreamContext const &)
Definition: Worker.cc:332
ActivityRegistry * activityRegistry()
Definition: Worker.h:300
std::string const & category() const
Definition: Exception.cc:147
assert(be >=bs)
void runAcquireAfterAsyncPrefetch(std::exception_ptr, EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder) noexcept
Definition: Worker.cc:411
void setActivityRegistry(std::shared_ptr< ActivityRegistry > areg)
Definition: Worker.cc:103
oneapi::tbb::task_group * m_group
Definition: Worker.h:603
ModuleCallingContext moduleCallingContext_
Definition: Worker.h:616
void edPrefetchAsync(WaitingTaskHolder, ServiceToken const &, Principal const &) const noexcept
Definition: Worker.cc:236
std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr, ParentContext const &parentContext) noexcept
Definition: Worker.cc:435
std::exception_ptr cached_exception_
Definition: Worker.h:619
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
ParameterSetID const & parameterSetID() const
int iEvent
Definition: GenABIO.cc:224
edm::WaitingTaskList waitingTasks_
Definition: Worker.h:625
virtual void implRegisterThinnedAssociations(ProductRegistry const &, ThinnedAssociationsHelper &)=0
void endStream(StreamID, StreamContext const &)
Definition: Worker.cc:351
void registerThinnedAssociations(ProductRegistry const &registry, ThinnedAssociationsHelper &helper)
Definition: Worker.cc:372
Transition
Definition: Transition.h:12
Worker(ModuleDescription const &iMD, ExceptionToActionTable const *iActions)
Definition: Worker.cc:82
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
ModuleDescription const * description() const noexcept
Definition: Worker.h:200
virtual void implEndJob()=0
ServiceToken lock() const
Definition: ServiceToken.h:101
void doneWaiting(std::exception_ptr iExcept) noexcept
HandleExternalWorkExceptionTask(Worker *worker, oneapi::tbb::task_group *group, WaitingTask *runModuleTask, ParentContext const &parentContext) noexcept
Definition: Worker.cc:449
ParentContext const & parent() const noexcept
virtual void implBeginJob()=0
bool beginSucceeded_
Definition: Worker.h:630
void prePrefetchSelectionAsync(oneapi::tbb::task_group &, WaitingTask *task, ServiceToken const &, StreamID stream, EventPrincipal const *) noexcept
Definition: Worker.cc:147
void skipOnPath(EventPrincipal const &iEvent)
Definition: Worker.cc:381
void addContext(std::string const &context)
Definition: Exception.cc:169
virtual void implBeginStream(StreamID)=0
virtual void implEndStream(StreamID)=0
State state() const noexcept
HLT enums.
void checkForShouldTryToContinue(ModuleDescription const &)
Definition: Worker.cc:105
void runAcquire(EventTransitionInfo const &, ParentContext const &, WaitingTaskWithArenaHolder &)
Definition: Worker.cc:396
void endJob(GlobalContext const &)
Definition: Worker.cc:311
void postDoEvent(EventPrincipal const &)
Definition: Worker.cc:390
void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const &, Transition, ServiceToken const &) noexcept
Definition: Worker.cc:211
auto wrap(F iFunc) -> decltype(iFunc())
edm::propagate_const< EarlyDeleteHelper * > earlyDeleteHelper_
Definition: Worker.h:623
std::atomic< int > timesRun_
Definition: Worker.h:607
void beginJob(GlobalContext const &)
Definition: Worker.cc:292
virtual size_t transformIndex(edm::BranchDescription const &) const noexcept=0
Definition: Worker.cc:253
static Registry * instance()
Definition: Registry.cc:12
bool shouldTryToContinue_
Definition: Worker.h:629
void printCmsExceptionWarning(char const *behavior, cms::Exception const &e)