CMS 3D CMS Logo

CallbackBase.h
Go to the documentation of this file.
1 // -*- C++ -*-
2 #ifndef FWCore_Framework_CallbackBase_h
3 #define FWCore_Framework_CallbackBase_h
4 //
5 // Package: Framework
6 // Class : CallbackBase
7 //
15 //
16 // Author: Chris Jones (original author, this was part of Callback.h),
17 // W. David Dagenhart (Refactored version + CallbackExternalWork, 2023)
18 // Created: Sun Apr 17 14:30:24 EDT 2005
19 //
20 
21 #include <array>
22 #include <atomic>
23 #include <exception>
24 #include <memory>
25 #include <optional>
26 #include <type_traits>
27 #include <utility>
28 #include <vector>
29 
30 #include "oneapi/tbb/task_group.h"
31 
50 
51 namespace edm {
52  void exceptionContext(cms::Exception&, ESModuleCallingContext const&);
53 
54  namespace eventsetup {
55 
56  // The default decorator that does nothing
57  template <typename TRecord>
59  void pre(const TRecord&) {}
60  void post(const TRecord&) {}
61  };
62 
63  template <typename T, //producer's type
64  typename TProduceFunc, //produce functor type
65  typename TReturn, //return type of the producer's method
66  typename TRecord, //the record passed in as an argument
67  typename TDecorator> //allows customization using pre/post calls
68  class CallbackBase {
69  public:
70  CallbackBase(T* iProd, std::shared_ptr<TProduceFunc> iProduceFunc, unsigned int iID, const TDecorator& iDec)
71  : proxyData_{},
72  producer_(iProd),
73  callingContext_(&iProd->description(), iID),
74  produceFunction_(std::move(iProduceFunc)),
75  id_(iID),
77  decorator_(iDec) {}
78 
79  CallbackBase(const CallbackBase&) = delete;
80  CallbackBase& operator=(const CallbackBase&) = delete;
81  CallbackBase(CallbackBase&&) = delete;
82  CallbackBase& operator=(CallbackBase&&) = delete;
83 
84  template <typename ProduceFunctor> // ProduceFunctor executes TProduceFunc
85  WaitingTaskHolder makeProduceTask(oneapi::tbb::task_group* group,
86  ServiceWeakToken const& serviceToken,
88  EventSetupImpl const* eventSetupImpl,
89  bool emitPostPrefetchingSignal,
90  ProduceFunctor&& produceFunctor) {
91  return WaitingTaskHolder(
92  *group,
93  make_waiting_task([this,
94  group,
95  serviceToken,
96  record,
97  eventSetupImpl,
98  emitPostPrefetchingSignal,
99  produceFunctor =
100  std::forward<ProduceFunctor>(produceFunctor)](std::exception_ptr const* iException) {
101  std::exception_ptr excptr;
102  if (iException) {
103  excptr = *iException;
104  }
105  if (emitPostPrefetchingSignal) {
106  try {
107  convertException::wrap([this, &serviceToken, &record] {
108  ServiceRegistry::Operate guard(serviceToken.lock());
109  record->activityRegistry()->postESModulePrefetchingSignal_.emit(record->key(), callingContext_);
110  });
111  } catch (cms::Exception& caughtException) {
112  if (not excptr) {
113  exceptionContext(caughtException, callingContext_);
114  excptr = std::current_exception();
115  }
116  }
117  }
118  if (excptr) {
119  taskList_.doneWaiting(excptr);
120  return;
121  }
122 
123  producer_->queue().push(
124  *group, [this, serviceToken, record, eventSetupImpl, produceFunctor = std::move(produceFunctor)]() {
126  std::exception_ptr exceptPtr;
127  try {
128  convertException::wrap([this, &serviceToken, &record, &eventSetupImpl, &produceFunctor] {
130  auto proxies = getTokenIndices();
131  if (postMayGetResolvers_) {
132  proxies = &((*postMayGetResolvers_).front());
133  }
134  TRecord rec;
136  rec.setImpl(record, transitionID(), proxies, eventSetupImpl, &pc);
137  ServiceRegistry::Operate operate(serviceToken.lock());
138  record->activityRegistry()->preESModuleSignal_.emit(record->key(), context);
139  struct EndGuard {
140  EndGuard(EventSetupRecordImpl const* iRecord, ESModuleCallingContext const& iContext)
141  : record_{iRecord}, context_{iContext} {}
142  ~EndGuard() {
143  record_->activityRegistry()->postESModuleSignal_.emit(record_->key(), context_);
144  }
145  EventSetupRecordImpl const* record_;
146  ESModuleCallingContext const& context_;
147  };
148  EndGuard guard(record, context);
149  decorator_.pre(rec);
150  storeReturnedValues(produceFunctor(rec));
151  decorator_.post(rec);
152  });
153  } catch (cms::Exception& iException) {
154  exceptionContext(iException, callingContext_);
155  exceptPtr = std::current_exception();
156  }
157  taskList_.doneWaiting(exceptPtr);
158  });
159  }));
160  }
161 
162  template <typename RunModuleFnctr>
163  void prefetchAsyncImpl(RunModuleFnctr&& runModuleFnctr,
164  WaitingTaskHolder iTask,
165  EventSetupRecordImpl const* iRecord,
166  EventSetupImpl const* iEventSetupImpl,
167  ServiceToken const& token,
168  ESParentContext const& iParent) noexcept {
169  bool expected = false;
170  auto doPrefetch = wasCalledForThisRecord_.compare_exchange_strong(expected, true);
171  taskList_.add(iTask);
172  if (doPrefetch) {
173  auto group = iTask.group();
174  ServiceWeakToken weakToken(token);
175  WaitingTaskHolder runModuleTaskHolder = runModuleFnctr(group, weakToken, iRecord, iEventSetupImpl);
177  iRecord->activityRegistry()->preESModulePrefetchingSignal_.emit(iRecord->key(), callingContext_);
178  if UNLIKELY (producer_->hasMayConsumes()) {
179  //after prefetching need to do the mayGet
180  auto mayGetTask = make_waiting_task(
181  [this, iRecord, iEventSetupImpl, weakToken, runModuleTaskHolder = std::move(runModuleTaskHolder)](
182  std::exception_ptr const* iExcept) mutable {
183  if (iExcept) {
184  runModuleTaskHolder.doneWaiting(*iExcept);
185  return;
186  }
187  if (handleMayGet(iRecord, iEventSetupImpl)) {
189  runModuleTaskHolder, iEventSetupImpl, &((*postMayGetResolvers_).front()), weakToken.lock());
190  } else {
191  runModuleTaskHolder.doneWaiting(std::exception_ptr{});
192  }
193  });
194 
195  //Get everything we can before knowing about the mayGets
196  prefetchNeededDataAsync(WaitingTaskHolder(*group, mayGetTask), iEventSetupImpl, getTokenIndices(), token);
197  } else {
198  prefetchNeededDataAsync(runModuleTaskHolder, iEventSetupImpl, getTokenIndices(), token);
199  }
200  }
201  }
202 
203  template <class DataT>
204  void holdOntoPointer(DataT* iData) {
206  }
207 
208  template <class RemainingContainerT, class DataT, class ProductsT>
209  void setData(ProductsT& iProducts) {
210  DataT* temp = reinterpret_cast<DataT*>(proxyData_[produce::find_index<TReturn, DataT>::value]);
211  if (nullptr != temp) {
212  moveFromTo(iProducts, *temp);
213  }
214  if constexpr (not std::is_same_v<produce::Null, RemainingContainerT>) {
215  setData<typename RemainingContainerT::head_type, typename RemainingContainerT::tail_type>(iProducts);
216  }
217  }
218 
220  wasCalledForThisRecord_ = false;
221  taskList_.reset();
222  }
223 
224  unsigned int transitionID() const noexcept { return id_; }
225  ESResolverIndex const* getTokenIndices() const noexcept { return producer_->getTokenIndices(id_); }
226 
227  std::optional<std::vector<ESResolverIndex>> const& postMayGetResolvers() const { return postMayGetResolvers_; }
228  T* producer() noexcept { return producer_.get(); }
230  WaitingTaskList& taskList() noexcept { return taskList_; }
231  std::shared_ptr<TProduceFunc> const& produceFunction() noexcept { return produceFunction_; }
232  TDecorator const& decorator() const noexcept { return decorator_; }
233  SerialTaskQueueChain& queue() noexcept { return producer_->queue(); }
234 
235  protected:
236  ~CallbackBase() = default;
237 
238  private:
239  void storeReturnedValues(TReturn iReturn) {
241  setData<typename type::head_type, typename type::tail_type>(iReturn);
242  }
243 
245  EventSetupImpl const* iImpl,
246  ESResolverIndex const* proxies,
247  ServiceToken const& token) const noexcept {
248  auto recs = producer_->getTokenRecordIndices(id_);
249  auto n = producer_->numberOfTokenIndices(id_);
250  for (size_t i = 0; i != n; ++i) {
251  auto rec = iImpl->findImpl(recs[i]);
252  if (rec) {
253  rec->prefetchAsync(task, proxies[i], iImpl, token, ESParentContext{&callingContext_});
254  }
255  }
256  }
257 
258  bool handleMayGet(EventSetupRecordImpl const* iRecord, EventSetupImpl const* iEventSetupImpl) {
259  //Handle mayGets
260  TRecord rec;
262  rec.setImpl(iRecord, transitionID(), getTokenIndices(), iEventSetupImpl, &pc);
263  postMayGetResolvers_ = producer_->updateFromMayConsumes(id_, rec);
264  return static_cast<bool>(postMayGetResolvers_);
265  }
266 
268  std::optional<std::vector<ESResolverIndex>> postMayGetResolvers_;
272  // Using std::shared_ptr in order to share the state of the
273  // functors across all clones
274  std::shared_ptr<TProduceFunc> produceFunction_;
275  // This transition id identifies which setWhatProduced call this Callback is associated with
276  const unsigned int id_;
277  std::atomic<bool> wasCalledForThisRecord_;
278  TDecorator decorator_;
279  };
280  } // namespace eventsetup
281 } // namespace edm
282 
283 #endif
CallbackBase & operator=(const CallbackBase &)=delete
unsigned int transitionID() const noexcept
Definition: CallbackBase.h:224
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
TDecorator const & decorator() const noexcept
Definition: CallbackBase.h:232
CallbackBase(T *iProd, std::shared_ptr< TProduceFunc > iProduceFunc, unsigned int iID, const TDecorator &iDec)
Definition: CallbackBase.h:70
bool handleMayGet(EventSetupRecordImpl const *iRecord, EventSetupImpl const *iEventSetupImpl)
Definition: CallbackBase.h:258
void storeReturnedValues(TReturn iReturn)
Definition: CallbackBase.h:239
SerialTaskQueueChain & queue() noexcept
Definition: CallbackBase.h:233
void prefetchNeededDataAsync(WaitingTaskHolder task, EventSetupImpl const *iImpl, ESResolverIndex const *proxies, ServiceToken const &token) const noexcept
Definition: CallbackBase.h:244
std::optional< std::vector< ESResolverIndex > > const & postMayGetResolvers() const
Definition: CallbackBase.h:227
void reset()
Resets access to the resource so that added tasks will wait.
WaitingTaskHolder makeProduceTask(oneapi::tbb::task_group *group, ServiceWeakToken const &serviceToken, EventSetupRecordImpl const *record, EventSetupImpl const *eventSetupImpl, bool emitPostPrefetchingSignal, ProduceFunctor &&produceFunctor)
Definition: CallbackBase.h:85
constexpr element_type const * get() const
propagate_const< T * > producer_
Definition: CallbackBase.h:269
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
ESModuleCallingContext & callingContext() noexcept
Definition: CallbackBase.h:229
void setData(ProductsT &iProducts)
Definition: CallbackBase.h:209
void holdOntoPointer(DataT *iData)
Definition: CallbackBase.h:204
void moveFromTo(FromT &iFrom, ToT &iTo)
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
WaitingTaskList & taskList() noexcept
Definition: CallbackBase.h:230
ServiceToken lock() const
Definition: ServiceToken.h:101
std::atomic< bool > wasCalledForThisRecord_
Definition: CallbackBase.h:277
void doneWaiting(std::exception_ptr iExcept) noexcept
std::optional< std::vector< ESResolverIndex > > postMayGetResolvers_
Definition: CallbackBase.h:268
void prefetchAsyncImpl(RunModuleFnctr &&runModuleFnctr, WaitingTaskHolder iTask, EventSetupRecordImpl const *iRecord, EventSetupImpl const *iEventSetupImpl, ServiceToken const &token, ESParentContext const &iParent) noexcept
Definition: CallbackBase.h:163
void setContext(State state, ESParentContext const &parent)
ESResolverIndex const * getTokenIndices() const noexcept
Definition: CallbackBase.h:225
HLT enums.
std::shared_ptr< TProduceFunc > produceFunction_
Definition: CallbackBase.h:274
std::array< void *, produce::size< TReturn >::value > proxyData_
Definition: CallbackBase.h:267
auto wrap(F iFunc) -> decltype(iFunc())
#define UNLIKELY(x)
Definition: Likely.h:21
std::shared_ptr< TProduceFunc > const & produceFunction() noexcept
Definition: CallbackBase.h:231
ESModuleCallingContext callingContext_
Definition: CallbackBase.h:270
long double T
def move(src, dest)
Definition: eostools.py:511