CMS 3D CMS Logo

CallbackExternalWork.h
Go to the documentation of this file.
1 // -*- C++ -*-
2 #ifndef FWCore_Framework_CallbackExternalWork_h
3 #define FWCore_Framework_CallbackExternalWork_h
4 //
5 // Package: Framework
6 // Class : CallbackExternalWork
7 //
15 //
16 // Author: W. David Dagenhart
17 // Created: 27 February 2023
18 
19 #include <exception>
20 #include <memory>
21 #include <optional>
22 #include <utility>
23 #include <vector>
24 
25 #include "oneapi/tbb/task_group.h"
26 
42 
43 namespace edm {
44 
45  class EventSetupImpl;
46 
47  namespace eventsetup {
48 
49  namespace impl {
50  template <typename U>
52  using type = std::optional<U>;
53  static U& value(type& val) { return val.value(); }
54  };
55  template <typename U>
57  using type = std::optional<U>;
58  static std::optional<U>& value(type& val) { return val; }
59  };
60  template <typename U>
62  using type = std::unique_ptr<U>;
63  static std::unique_ptr<U>& value(type& val) { return val; }
64  };
65  template <typename U>
66  struct AcquireCacheType<std::shared_ptr<U>> {
67  using type = std::shared_ptr<U>;
68  static std::shared_ptr<U>& value(type& val) { return val; }
69  };
70  } // namespace impl
71 
72  template <typename T, //producer's type
73  typename TAcquireFunc, //acquire functor type
74  typename TAcquireReturn, //return type of the acquire method
75  typename TProduceFunc, //produce functor type
76  typename TProduceReturn, //return type of the produce method
77  typename TRecord, //the record passed in as an argument
78  typename TDecorator //allows customization using pre/post calls
79  = CallbackSimpleDecorator<TRecord>>
80  class CallbackExternalWork : public CallbackBase<T, TProduceFunc, TProduceReturn, TRecord, TDecorator> {
81  public:
83 
85  TAcquireFunc iAcquireFunc,
86  TProduceFunc iProduceFunc,
87  unsigned int iID,
88  const TDecorator& iDec = TDecorator())
89  : CallbackExternalWork(iProd,
90  std::make_shared<TAcquireFunc>(std::move(iAcquireFunc)),
91  std::make_shared<TProduceFunc>(std::move(iProduceFunc)),
92  iID,
93  iDec) {}
94 
96  return new CallbackExternalWork(
98  }
99 
101  EventSetupRecordImpl const* iRecord,
102  EventSetupImpl const* iEventSetupImpl,
103  ServiceToken const& token,
104  ESParentContext const& iParent) noexcept {
106  [this](auto&& group, auto&& token, auto&& record, auto&& es) {
107  constexpr bool emitPostPrefetchingSignal = false;
108  auto produceFunctor = [this](TRecord const& record) {
109  auto returnValue = (*Base::produceFunction())(
111  acquireCache_.reset();
112  return returnValue;
113  };
114  WaitingTaskHolder produceTask =
115  Base::makeProduceTask(group, token, record, es, emitPostPrefetchingSignal, std::move(produceFunctor));
116 
117  WaitingTaskWithArenaHolder waitingTaskWithArenaHolder =
119 
120  return makeAcquireTask(std::move(waitingTaskWithArenaHolder), group, token, record, es);
121  },
122  std::move(iTask),
123  iRecord,
124  iEventSetupImpl,
125  token,
126  iParent);
127  }
128 
129  private:
131  std::shared_ptr<TAcquireFunc> iAcquireFunc,
132  std::shared_ptr<TProduceFunc> iProduceFunc,
133  unsigned int iID,
134  const TDecorator& iDec = TDecorator())
135  : Base(iProd, std::move(iProduceFunc), iID, iDec), acquireFunction_(std::move(iAcquireFunc)) {}
136 
138  oneapi::tbb::task_group* group,
139  ServiceWeakToken const& serviceToken,
141  EventSetupImpl const* eventSetupImpl) {
142  return WaitingTaskHolder(
143  *group,
145  [this, holder = std::move(waitingTaskWithArenaHolder), group, serviceToken, record, eventSetupImpl](
146  std::exception_ptr const* iException) mutable {
147  std::exception_ptr excptr;
148  if (iException) {
149  excptr = *iException;
150  }
151  try {
152  convertException::wrap([this, &serviceToken, &record] {
153  ServiceRegistry::Operate guard(serviceToken.lock());
154  record->activityRegistry()->postESModulePrefetchingSignal_.emit(record->key(),
156  });
157  } catch (cms::Exception& caughtException) {
158  if (not excptr) {
159  edm::exceptionContext(caughtException, Base::callingContext());
160  excptr = std::current_exception();
161  }
162  }
163  if (excptr) {
164  Base::taskList().doneWaiting(excptr);
165  return;
166  }
167 
168  Base::queue().push(
169  *group, [this, holder = std::move(holder), serviceToken, record, eventSetupImpl]() mutable {
171  std::exception_ptr exceptPtr;
172  try {
173  convertException::wrap([this, &holder, &serviceToken, &record, &eventSetupImpl] {
175  auto proxies = Base::getTokenIndices();
177  proxies = &((*Base::postMayGetResolvers()).front());
178  }
179  TRecord rec;
181  rec.setImpl(record, Base::transitionID(), proxies, eventSetupImpl, &pc);
182  ServiceRegistry::Operate operate(serviceToken.lock());
183  record->activityRegistry()->preESModuleAcquireSignal_.emit(record->key(), context);
184  struct EndGuard {
185  EndGuard(EventSetupRecordImpl const* iRecord, ESModuleCallingContext const& iContext)
186  : record_{iRecord}, context_{iContext} {}
187  ~EndGuard() {
188  record_->activityRegistry()->postESModuleAcquireSignal_.emit(record_->key(), context_);
189  }
190  EventSetupRecordImpl const* record_;
191  ESModuleCallingContext const& context_;
192  };
193  EndGuard guard(record, context);
194  acquireCache_ = (*acquireFunction_)(rec, holder);
195  });
196  } catch (cms::Exception& iException) {
197  iException.addContext("Running acquire");
198  exceptPtr = std::current_exception();
199  }
200  holder.doneWaiting(exceptPtr);
201  });
202  }));
203  }
204 
206  oneapi::tbb::task_group* group) {
208  make_waiting_task([this, produceTask = std::move(produceTask)](
209  std::exception_ptr const* iException) mutable {
210  std::exception_ptr excptr;
211  if (iException) {
212  excptr = *iException;
213  }
214  if (excptr) {
215  try {
216  convertException::wrap([excptr]() { std::rethrow_exception(excptr); });
217  } catch (cms::Exception& exception) {
218  exception.addContext("Running acquire and external work");
220  produceTask.doneWaiting(std::current_exception());
221  }
222  }
223  }));
224  }
225 
226  std::shared_ptr<TAcquireFunc> acquireFunction_;
228  };
229  } // namespace eventsetup
230 } // namespace edm
231 
232 #endif
CallbackExternalWork(T *iProd, TAcquireFunc iAcquireFunc, TProduceFunc iProduceFunc, unsigned int iID, const TDecorator &iDec=TDecorator())
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
std::shared_ptr< TAcquireFunc > acquireFunction_
std::optional< std::vector< ESResolverIndex > > const & postMayGetResolvers() const
Definition: CallbackBase.h:227
std::unique_ptr< T, impl::DeviceDeleter > unique_ptr
CallbackExternalWork(T *iProd, std::shared_ptr< TAcquireFunc > iAcquireFunc, std::shared_ptr< TProduceFunc > iProduceFunc, unsigned int iID, const TDecorator &iDec=TDecorator())
void prefetchAsync(WaitingTaskHolder iTask, EventSetupRecordImpl const *iRecord, EventSetupImpl const *iEventSetupImpl, ServiceToken const &token, ESParentContext const &iParent) noexcept
WaitingTaskHolder makeProduceTask(oneapi::tbb::task_group *group, ServiceWeakToken const &serviceToken, EventSetupRecordImpl const *record, EventSetupImpl const *eventSetupImpl, bool emitPostPrefetchingSignal, ProduceFunctor &&produceFunctor)
Definition: CallbackBase.h:85
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
optional
Definition: Types.py:245
ServiceToken lock() const
Definition: ServiceToken.h:101
void doneWaiting(std::exception_ptr iExcept) noexcept
WaitingTaskWithArenaHolder makeExceptionHandlerTask(WaitingTaskHolder produceTask, oneapi::tbb::task_group *group)
void addContext(std::string const &context)
Definition: Exception.cc:169
void prefetchAsyncImpl(RunModuleFnctr &&runModuleFnctr, WaitingTaskHolder iTask, EventSetupRecordImpl const *iRecord, EventSetupImpl const *iEventSetupImpl, ServiceToken const &token, ESParentContext const &iParent) noexcept
Definition: CallbackBase.h:163
impl::AcquireCacheType< TAcquireReturn >::type acquireCache_
HLT enums.
auto wrap(F iFunc) -> decltype(iFunc())
long double T
WaitingTaskHolder makeAcquireTask(WaitingTaskWithArenaHolder waitingTaskWithArenaHolder, oneapi::tbb::task_group *group, ServiceWeakToken const &serviceToken, EventSetupRecordImpl const *record, EventSetupImpl const *eventSetupImpl)
def move(src, dest)
Definition: eostools.py:511
void push(oneapi::tbb::task_group &iGroup, T &&iAction)
asynchronously pushes functor iAction into queue