CMS 3D CMS Logo

CallbackBase.h
Go to the documentation of this file.
1 // -*- C++ -*-
2 #ifndef FWCore_Framework_CallbackBase_h
3 #define FWCore_Framework_CallbackBase_h
4 //
5 // Package: Framework
6 // Class : CallbackBase
7 //
15 //
16 // Author: Chris Jones (original author, this was part of Callback.h),
17 // W. David Dagenhart (Refactored version + CallbackExternalWork, 2023)
18 // Created: Sun Apr 17 14:30:24 EDT 2005
19 //
20 
21 #include <array>
22 #include <atomic>
23 #include <exception>
24 #include <memory>
25 #include <optional>
26 #include <type_traits>
27 #include <utility>
28 #include <vector>
29 
30 #include "oneapi/tbb/task_group.h"
31 
50 
51 namespace edm {
52  void exceptionContext(cms::Exception&, ESModuleCallingContext const&);
53 
54  namespace eventsetup {
55 
56  // The default decorator that does nothing
57  template <typename TRecord>
59  void pre(const TRecord&) {}
60  void post(const TRecord&) {}
61  };
62 
63  template <typename T, //producer's type
64  typename TProduceFunc, //produce functor type
65  typename TReturn, //return type of the producer's method
66  typename TRecord, //the record passed in as an argument
67  typename TDecorator> //allows customization using pre/post calls
68  class CallbackBase {
69  public:
70  CallbackBase(T* iProd, std::shared_ptr<TProduceFunc> iProduceFunc, unsigned int iID, const TDecorator& iDec)
71  : proxyData_{},
72  producer_(iProd),
73  callingContext_(&iProd->description()),
74  produceFunction_(std::move(iProduceFunc)),
75  id_(iID),
77  decorator_(iDec) {}
78 
79  CallbackBase(const CallbackBase&) = delete;
80  CallbackBase& operator=(const CallbackBase&) = delete;
81  CallbackBase(CallbackBase&&) = delete;
82  CallbackBase& operator=(CallbackBase&&) = delete;
83 
84  WaitingTaskHolder makeProduceTask(oneapi::tbb::task_group* group,
85  ServiceWeakToken const& serviceToken,
87  EventSetupImpl const* eventSetupImpl,
88  bool emitPostPrefetchingSignal) {
89  return WaitingTaskHolder(
90  *group,
91  make_waiting_task([this, group, serviceToken, record, eventSetupImpl, emitPostPrefetchingSignal](
92  std::exception_ptr const* iException) {
93  std::exception_ptr excptr;
94  if (iException) {
95  excptr = *iException;
96  }
97  if (emitPostPrefetchingSignal) {
98  try {
99  convertException::wrap([this, &serviceToken, &record] {
100  ServiceRegistry::Operate guard(serviceToken.lock());
101  record->activityRegistry()->postESModulePrefetchingSignal_.emit(record->key(), callingContext_);
102  });
103  } catch (cms::Exception& caughtException) {
104  if (not excptr) {
105  exceptionContext(caughtException, callingContext_);
106  excptr = std::current_exception();
107  }
108  }
109  }
110  if (excptr) {
111  taskList_.doneWaiting(excptr);
112  return;
113  }
114 
115  producer_->queue().push(*group, [this, serviceToken, record, eventSetupImpl]() {
117  std::exception_ptr exceptPtr;
118  try {
119  convertException::wrap([this, &serviceToken, &record, &eventSetupImpl] {
121  auto proxies = getTokenIndices();
122  if (postMayGetProxies_) {
123  proxies = &((*postMayGetProxies_).front());
124  }
125  TRecord rec;
127  rec.setImpl(record, transitionID(), proxies, eventSetupImpl, &pc);
128  ServiceRegistry::Operate operate(serviceToken.lock());
129  record->activityRegistry()->preESModuleSignal_.emit(record->key(), context);
130  struct EndGuard {
131  EndGuard(EventSetupRecordImpl const* iRecord, ESModuleCallingContext const& iContext)
132  : record_{iRecord}, context_{iContext} {}
133  ~EndGuard() { record_->activityRegistry()->postESModuleSignal_.emit(record_->key(), context_); }
134  EventSetupRecordImpl const* record_;
135  ESModuleCallingContext const& context_;
136  };
137  EndGuard guard(record, context);
138  decorator_.pre(rec);
140  decorator_.post(rec);
141  });
142  } catch (cms::Exception& iException) {
143  exceptionContext(iException, callingContext_);
144  exceptPtr = std::current_exception();
145  }
146  taskList_.doneWaiting(exceptPtr);
147  });
148  }));
149  }
150 
151  template <typename RunModuleFnctr>
152  void prefetchAsyncImpl(RunModuleFnctr&& runModuleFnctr,
153  WaitingTaskHolder iTask,
154  EventSetupRecordImpl const* iRecord,
155  EventSetupImpl const* iEventSetupImpl,
156  ServiceToken const& token,
157  ESParentContext const& iParent) {
158  bool expected = false;
159  auto doPrefetch = wasCalledForThisRecord_.compare_exchange_strong(expected, true);
160  taskList_.add(iTask);
161  if (doPrefetch) {
162  auto group = iTask.group();
163  ServiceWeakToken weakToken(token);
164  WaitingTaskHolder runModuleTaskHolder = runModuleFnctr(group, weakToken, iRecord, iEventSetupImpl);
167  if UNLIKELY (producer_->hasMayConsumes()) {
168  //after prefetching need to do the mayGet
169  auto mayGetTask = make_waiting_task(
170  [this, iRecord, iEventSetupImpl, weakToken, runModuleTaskHolder = std::move(runModuleTaskHolder)](
171  std::exception_ptr const* iExcept) mutable {
172  if (iExcept) {
173  runModuleTaskHolder.doneWaiting(*iExcept);
174  return;
175  }
176  if (handleMayGet(iRecord, iEventSetupImpl)) {
178  runModuleTaskHolder, iEventSetupImpl, &((*postMayGetProxies_).front()), weakToken.lock());
179  } else {
180  runModuleTaskHolder.doneWaiting(std::exception_ptr{});
181  }
182  });
183 
184  //Get everything we can before knowing about the mayGets
185  prefetchNeededDataAsync(WaitingTaskHolder(*group, mayGetTask), iEventSetupImpl, getTokenIndices(), token);
186  } else {
187  prefetchNeededDataAsync(runModuleTaskHolder, iEventSetupImpl, getTokenIndices(), token);
188  }
189  }
190  }
191 
192  template <class DataT>
193  void holdOntoPointer(DataT* iData) {
195  }
196 
197  template <class RemainingContainerT, class DataT, class ProductsT>
198  void setData(ProductsT& iProducts) {
199  DataT* temp = reinterpret_cast<DataT*>(proxyData_[produce::find_index<TReturn, DataT>::value]);
200  if (nullptr != temp) {
201  moveFromTo(iProducts, *temp);
202  }
203  if constexpr (not std::is_same_v<produce::Null, RemainingContainerT>) {
204  setData<typename RemainingContainerT::head_type, typename RemainingContainerT::tail_type>(iProducts);
205  }
206  }
207 
209  wasCalledForThisRecord_ = false;
210  taskList_.reset();
211  }
212 
213  unsigned int transitionID() const { return id_; }
214  ESProxyIndex const* getTokenIndices() const { return producer_->getTokenIndices(id_); }
215 
216  std::optional<std::vector<ESProxyIndex>> const& postMayGetProxies() const { return postMayGetProxies_; }
217  T* producer() { return producer_.get(); }
220  std::shared_ptr<TProduceFunc> const& produceFunction() { return produceFunction_; }
221  TDecorator const& decorator() const { return decorator_; }
222  SerialTaskQueueChain& queue() { return producer_->queue(); }
223 
224  protected:
225  ~CallbackBase() = default;
226 
227  private:
228  void storeReturnedValues(TReturn iReturn) {
230  setData<typename type::head_type, typename type::tail_type>(iReturn);
231  }
232 
234  EventSetupImpl const* iImpl,
235  ESProxyIndex const* proxies,
236  ServiceToken const& token) const {
237  auto recs = producer_->getTokenRecordIndices(id_);
238  auto n = producer_->numberOfTokenIndices(id_);
239  for (size_t i = 0; i != n; ++i) {
240  auto rec = iImpl->findImpl(recs[i]);
241  if (rec) {
242  rec->prefetchAsync(task, proxies[i], iImpl, token, ESParentContext{&callingContext_});
243  }
244  }
245  }
246 
247  bool handleMayGet(EventSetupRecordImpl const* iRecord, EventSetupImpl const* iEventSetupImpl) {
248  //Handle mayGets
249  TRecord rec;
251  rec.setImpl(iRecord, transitionID(), getTokenIndices(), iEventSetupImpl, &pc);
252  postMayGetProxies_ = producer_->updateFromMayConsumes(id_, rec);
253  return static_cast<bool>(postMayGetProxies_);
254  }
255 
257  std::optional<std::vector<ESProxyIndex>> postMayGetProxies_;
261  // Using std::shared_ptr in order to share the state of the
262  // functors across all clones
263  std::shared_ptr<TProduceFunc> produceFunction_;
264  // This transition id identifies which setWhatProduced call this Callback is associated with
265  const unsigned int id_;
266  std::atomic<bool> wasCalledForThisRecord_;
267  TDecorator decorator_;
268  };
269  } // namespace eventsetup
270 } // namespace edm
271 
272 #endif
CallbackBase & operator=(const CallbackBase &)=delete
ESProxyIndex const * getTokenIndices() const
Definition: CallbackBase.h:214
void prefetchNeededDataAsync(WaitingTaskHolder task, EventSetupImpl const *iImpl, ESProxyIndex const *proxies, ServiceToken const &token) const
Definition: CallbackBase.h:233
void exceptionContext(cms::Exception &, ESModuleCallingContext const &)
void prefetchAsync(WaitingTaskHolder iTask, ESProxyIndex iProxyIndex, EventSetupImpl const *, ServiceToken const &, ESParentContext) const
prefetch the data to setup for subsequent calls to getImplementation
PreESModulePrefetching preESModulePrefetchingSignal_
CallbackBase(T *iProd, std::shared_ptr< TProduceFunc > iProduceFunc, unsigned int iID, const TDecorator &iDec)
Definition: CallbackBase.h:70
bool handleMayGet(EventSetupRecordImpl const *iRecord, EventSetupImpl const *iEventSetupImpl)
Definition: CallbackBase.h:247
WaitingTaskHolder makeProduceTask(oneapi::tbb::task_group *group, ServiceWeakToken const &serviceToken, EventSetupRecordImpl const *record, EventSetupImpl const *eventSetupImpl, bool emitPostPrefetchingSignal)
Definition: CallbackBase.h:84
void storeReturnedValues(TReturn iReturn)
Definition: CallbackBase.h:228
void reset()
Resets access to the resource so that added tasks will wait.
constexpr element_type const * get() const
propagate_const< T * > producer_
Definition: CallbackBase.h:258
oneapi::tbb::task_group * group() const noexcept
void emit(Args &&... args) const
Definition: Signal.h:48
std::optional< std::vector< ESProxyIndex > > const & postMayGetProxies() const
Definition: CallbackBase.h:216
void doneWaiting(std::exception_ptr iPtr)
Signals that the resource is now available and tasks should be spawned.
void doneWaiting(std::exception_ptr iExcept)
void setData(ProductsT &iProducts)
Definition: CallbackBase.h:198
void holdOntoPointer(DataT *iData)
Definition: CallbackBase.h:193
void moveFromTo(FromT &iFrom, ToT &iTo)
void add(oneapi::tbb::task_group *, WaitingTask *)
Adds task to the waiting list.
FunctorWaitingTask< F > * make_waiting_task(F f)
Definition: WaitingTask.h:92
std::shared_ptr< TProduceFunc > const & produceFunction()
Definition: CallbackBase.h:220
ServiceToken lock() const
Definition: ServiceToken.h:101
std::atomic< bool > wasCalledForThisRecord_
Definition: CallbackBase.h:266
WaitingTaskList & taskList()
Definition: CallbackBase.h:219
unsigned int transitionID() const
Definition: CallbackBase.h:213
SerialTaskQueueChain & queue()
Definition: CallbackBase.h:222
EventSetupRecordKey const & key() const
std::optional< std::vector< ESProxyIndex > > postMayGetProxies_
Definition: CallbackBase.h:257
void setContext(State state, ESParentContext const &parent)
HLT enums.
std::shared_ptr< TProduceFunc > produceFunction_
Definition: CallbackBase.h:263
TDecorator const & decorator() const
Definition: CallbackBase.h:221
std::array< void *, produce::size< TReturn >::value > proxyData_
Definition: CallbackBase.h:256
auto wrap(F iFunc) -> decltype(iFunc())
#define UNLIKELY(x)
Definition: Likely.h:21
ESModuleCallingContext callingContext_
Definition: CallbackBase.h:259
ESModuleCallingContext & callingContext()
Definition: CallbackBase.h:218
long double T
eventsetup::EventSetupRecordImpl const * findImpl(const eventsetup::EventSetupRecordKey &) const
def move(src, dest)
Definition: eostools.py:511
ActivityRegistry const * activityRegistry() const noexcept
void prefetchAsyncImpl(RunModuleFnctr &&runModuleFnctr, WaitingTaskHolder iTask, EventSetupRecordImpl const *iRecord, EventSetupImpl const *iEventSetupImpl, ServiceToken const &token, ESParentContext const &iParent)
Definition: CallbackBase.h:152