CMS 3D CMS Logo

CachingAllocator.h
Go to the documentation of this file.
1 #ifndef HeterogeneousCore_AlpakaInterface_interface_CachingAllocator_h
2 #define HeterogeneousCore_AlpakaInterface_interface_CachingAllocator_h
3 
4 #include <cassert>
5 #include <exception>
6 #include <iomanip>
7 #include <iostream>
8 #include <map>
9 #include <mutex>
10 #include <optional>
11 #include <sstream>
12 #include <string>
13 #include <tuple>
14 #include <type_traits>
15 
16 #include <alpaka/alpaka.hpp>
17 
21 
22 // Inspired by cub::CachingDeviceAllocator
23 
24 namespace cms::alpakatools {
25 
26  namespace detail {
27 
28  inline constexpr unsigned int power(unsigned int base, unsigned int exponent) {
29  unsigned int power = 1;
30  while (exponent > 0) {
31  if (exponent & 1) {
32  power = power * base;
33  }
34  base = base * base;
35  exponent = exponent >> 1;
36  }
37  return power;
38  }
39 
40  // format a memory size in B/kB/MB/GB
41  inline std::string as_bytes(size_t value) {
43  return "unlimited";
44  } else if (value >= (1 << 30) and value % (1 << 30) == 0) {
45  return std::to_string(value >> 30) + " GB";
46  } else if (value >= (1 << 20) and value % (1 << 20) == 0) {
47  return std::to_string(value >> 20) + " MB";
48  } else if (value >= (1 << 10) and value % (1 << 10) == 0) {
49  return std::to_string(value >> 10) + " kB";
50  } else {
51  return std::to_string(value) + " B";
52  }
53  }
54 
55  } // namespace detail
56 
57  /*
58  * The "memory device" identifies the memory space, i.e. the device where the memory is allocated.
59  * A caching allocator object is associated to a single memory `Device`, set at construction time, and unchanged for
60  * the lifetime of the allocator.
61  *
62  * Each allocation is associated to an event on a queue, that identifies the "synchronisation device" according to
63  * which the synchronisation occurs.
64  * The `Event` type depends only on the synchronisation `Device` type.
65  * The `Queue` type depends on the synchronisation `Device` type and the queue properties, either `Sync` or `Async`.
66  *
67  * **Note**: how to handle different queue and event types in a single allocator ? store and access type-punned
68  * queues and events ? or template the internal structures on them, but with a common base class ?
69  * alpaka does rely on the compile-time type for dispatch.
70  *
71  * Common use case #1: accelerator's memory allocations
72  * - the "memory device" is the accelerator device (e.g. a GPU);
73  * - the "synchronisation device" is the same accelerator device;
74  * - the `Queue` type is usually always the same (either `Sync` or `Async`).
75  *
76  * Common use case #2: pinned host memory allocations
77  * - the "memory device" is the host device (e.g. system memory);
78  * - the "synchronisation device" is the accelerator device (e.g. a GPU) whose work queue will access the host;
79  * memory (direct memory access from the accelerator, or scheduling `alpaka::memcpy`/`alpaka::memset`), and can
80  * be different for each allocation;
81  * - the synchronisation `Device` _type_ could potentially be different, but memory pinning is currently tied to
82  * the accelerator's platform (CUDA, HIP, etc.), so the device type needs to be fixed to benefit from caching;
83  * - the `Queue` type can be either `Sync` _or_ `Async` on any allocation.
84  */
85 
86  template <typename TDev, typename TQueue>
88  public:
89 #ifdef ALPAKA_ACC_GPU_CUDA_ENABLED
90  friend class alpaka_cuda_async::AlpakaService;
91 #endif
92 #ifdef ALPAKA_ACC_GPU_HIP_ENABLED
93  friend class alpaka_rocm_async::AlpakaService;
94 #endif
95 #ifdef ALPAKA_ACC_CPU_B_SEQ_T_SEQ_ENABLED
96  friend class alpaka_serial_sync::AlpakaService;
97 #endif
98 #ifdef ALPAKA_ACC_CPU_B_TBB_T_SEQ_ENABLED
99  friend class alpaka_tbb_async::AlpakaService;
100 #endif
101 
102  using Device = TDev; // the "memory device", where the memory will be allocated
103  using Queue = TQueue; // the queue used to submit the memory operations
104  using Event = alpaka::Event<Queue>; // the events used to synchronise the operations
105  using Buffer = alpaka::Buf<Device, std::byte, alpaka::DimInt<1u>, size_t>;
106 
107  // The "memory device" type can either be the same as the "synchronisation device" type, or be the host CPU.
108  static_assert(alpaka::isDevice<Device>, "TDev should be an alpaka Device type.");
109  static_assert(alpaka::isQueue<Queue>, "TQueue should be an alpaka Queue type.");
110  static_assert(std::is_same_v<Device, alpaka::Dev<Queue>> or std::is_same_v<Device, alpaka::DevCpu>,
111  "The \"memory device\" type can either be the same as the \"synchronisation device\" type, or be the "
112  "host CPU.");
113 
114  struct CachedBytes {
115  size_t free = 0; // total bytes freed and cached on this device
116  size_t live = 0; // total bytes currently in use oin this device
117  size_t requested = 0; // total bytes requested and currently in use on this device
118  };
119 
121  Device const& device,
122  unsigned int binGrowth, // bin growth factor;
123  unsigned int minBin, // smallest bin, corresponds to binGrowth^minBin bytes;
124  // smaller allocations are rounded to this value;
125  unsigned int maxBin, // largest bin, corresponds to binGrowth^maxBin bytes;
126  // larger allocations will fail;
127  size_t maxCachedBytes, // total storage for the allocator (0 means no limit);
128  double maxCachedFraction, // fraction of total device memory taken for the allocator (0 means no limit);
129  // if both maxCachedBytes and maxCachedFraction are non-zero,
130  // the smallest resulting value is used.
131  bool reuseSameQueueAllocations, // reuse non-ready allocations if they are in the same queue as the new one;
132  // this is safe only if all memory operations are scheduled in the same queue
133  bool debug)
134  : device_(device),
136  minBin_(minBin),
137  maxBin_(maxBin),
141  reuseSameQueueAllocations_(reuseSameQueueAllocations),
142  debug_(debug) {
143  if (debug_) {
144  std::ostringstream out;
145  out << "CachingAllocator settings\n"
146  << " bin growth " << binGrowth_ << "\n"
147  << " min bin " << minBin_ << "\n"
148  << " max bin " << maxBin_ << "\n"
149  << " resulting bins:\n";
150  for (auto bin = minBin_; bin <= maxBin_; ++bin) {
151  auto binSize = detail::power(binGrowth, bin);
152  out << " " << std::right << std::setw(12) << detail::as_bytes(binSize) << '\n';
153  }
154  out << " maximum amount of cached memory: " << detail::as_bytes(maxCachedBytes_);
155  std::cout << out.str() << std::endl;
156  }
157  }
158 
160  {
161  // this should never be called while some memory blocks are still live
162  std::scoped_lock lock(mutex_);
163  assert(liveBlocks_.empty());
164  assert(cachedBytes_.live == 0);
165  }
166 
167  freeAllCached();
168  }
169 
170  // return a copy of the cache allocation status, for monitoring purposes
172  std::scoped_lock lock(mutex_);
173  return cachedBytes_;
174  }
175 
176  // Allocate given number of bytes on the current device associated to given queue
177  void* allocate(size_t bytes, Queue queue) {
178  // create a block descriptor for the requested allocation
180  block.queue = std::move(queue);
181  block.requested = bytes;
182  std::tie(block.bin, block.bytes) = findBin(bytes);
183 
184  // try to re-use a cached block, or allocate a new buffer
185  if (not tryReuseCachedBlock(block)) {
187  }
188 
189  return block.buffer->data();
190  }
191 
192  // frees an allocation
193  void free(void* ptr) {
194  std::scoped_lock lock(mutex_);
195 
196  auto iBlock = liveBlocks_.find(ptr);
197  if (iBlock == liveBlocks_.end()) {
198  std::stringstream ss;
199  ss << "Trying to free a non-live block at " << ptr;
200  throw std::runtime_error(ss.str());
201  }
202  // remove the block from the list of live blocks
203  BlockDescriptor block = std::move(iBlock->second);
204  liveBlocks_.erase(iBlock);
205  cachedBytes_.live -= block.bytes;
206  cachedBytes_.requested -= block.requested;
207 
208  bool recache = (cachedBytes_.free + block.bytes <= maxCachedBytes_);
209  if (recache) {
210  alpaka::enqueue(*(block.queue), *(block.event));
211  cachedBytes_.free += block.bytes;
212  // after the call to insert(), cachedBlocks_ shares ownership of the buffer
213  // TODO use std::move ?
214  cachedBlocks_.insert(std::make_pair(block.bin, block));
215 
216  if (debug_) {
217  std::ostringstream out;
218  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " returned " << block.bytes << " bytes at "
219  << ptr << " from associated queue " << block.queue->m_spQueueImpl.get() << " , event "
220  << block.event->m_spEventImpl.get() << " .\n\t\t " << cachedBlocks_.size() << " available blocks cached ("
221  << cachedBytes_.free << " bytes), " << liveBlocks_.size() << " live blocks (" << cachedBytes_.live
222  << " bytes) outstanding." << std::endl;
223  std::cout << out.str() << std::endl;
224  }
225  } else {
226  // if the buffer is not recached, it is automatically freed when block goes out of scope
227  if (debug_) {
228  std::ostringstream out;
229  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " freed " << block.bytes << " bytes at "
230  << ptr << " from associated queue " << block.queue->m_spQueueImpl.get() << ", event "
231  << block.event->m_spEventImpl.get() << " .\n\t\t " << cachedBlocks_.size() << " available blocks cached ("
232  << cachedBytes_.free << " bytes), " << liveBlocks_.size() << " live blocks (" << cachedBytes_.live
233  << " bytes) outstanding." << std::endl;
234  std::cout << out.str() << std::endl;
235  }
236  }
237  }
238 
239  private:
241  std::optional<Buffer> buffer;
242  std::optional<Queue> queue;
243  std::optional<Event> event;
244  size_t bytes = 0;
245  size_t requested = 0; // for monitoring only
246  unsigned int bin = 0;
247 
248  // the "synchronisation device" for this block
249  auto device() { return alpaka::getDev(*queue); }
250  };
251 
252  private:
253  // return the maximum amount of memory that should be cached on this device
254  size_t cacheSize(size_t maxCachedBytes, double maxCachedFraction) const {
255  // note that getMemBytes() returns 0 if the platform does not support querying the device memory
256  size_t totalMemory = alpaka::getMemBytes(device_);
257  size_t memoryFraction = static_cast<size_t>(maxCachedFraction * totalMemory);
259  if (maxCachedBytes > 0 and maxCachedBytes < size) {
261  }
262  if (memoryFraction > 0 and memoryFraction < size) {
263  size = memoryFraction;
264  }
265  return size;
266  }
267 
268  // return (bin, bin size)
269  std::tuple<unsigned int, size_t> findBin(size_t bytes) const {
270  if (bytes < minBinBytes_) {
271  return std::make_tuple(minBin_, minBinBytes_);
272  }
273  if (bytes > maxBinBytes_) {
274  throw std::runtime_error("Requested allocation size " + std::to_string(bytes) +
275  " bytes is too large for the caching detail with maximum bin " +
277  " bytes. You might want to increase the maximum bin size");
278  }
279  unsigned int bin = minBin_;
280  size_t binBytes = minBinBytes_;
281  while (binBytes < bytes) {
282  ++bin;
283  binBytes *= binGrowth_;
284  }
285  return std::make_tuple(bin, binBytes);
286  }
287 
289  std::scoped_lock lock(mutex_);
290 
291  // iterate through the range of cached blocks in the same bin
292  const auto [begin, end] = cachedBlocks_.equal_range(block.bin);
293  for (auto iBlock = begin; iBlock != end; ++iBlock) {
294  if ((reuseSameQueueAllocations_ and (*block.queue == *(iBlock->second.queue))) or
295  alpaka::isComplete(*(iBlock->second.event))) {
296  // associate the cached buffer to the new queue
297  auto queue = std::move(*(block.queue));
298  // TODO cache (or remove) the debug information and use std::move()
299  block = iBlock->second;
300  block.queue = std::move(queue);
301 
302  // if the new queue is on different device than the old event, create a new event
303  if (block.device() != alpaka::getDev(*(block.event))) {
304  block.event = Event{block.device()};
305  }
306 
307  // insert the cached block into the live blocks
308  // TODO cache (or remove) the debug information and use std::move()
309  liveBlocks_[block.buffer->data()] = block;
310 
311  // update the accounting information
312  cachedBytes_.free -= block.bytes;
313  cachedBytes_.live += block.bytes;
314  cachedBytes_.requested += block.requested;
315 
316  if (debug_) {
317  std::ostringstream out;
318  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " reused cached block at "
319  << block.buffer->data() << " (" << block.bytes << " bytes) for queue "
320  << block.queue->m_spQueueImpl.get() << ", event " << block.event->m_spEventImpl.get()
321  << " (previously associated with queue " << iBlock->second.queue->m_spQueueImpl.get() << " , event "
322  << iBlock->second.event->m_spEventImpl.get() << ")." << std::endl;
323  std::cout << out.str() << std::endl;
324  }
325 
326  // remove the reused block from the list of cached blocks
327  cachedBlocks_.erase(iBlock);
328  return true;
329  }
330  }
331 
332  return false;
333  }
334 
335  Buffer allocateBuffer(size_t bytes, Queue const& queue) {
336  if constexpr (std::is_same_v<Device, alpaka::Dev<Queue>>) {
337  // allocate device memory
338  return alpaka::allocBuf<std::byte, size_t>(device_, bytes);
339  } else if constexpr (std::is_same_v<Device, alpaka::DevCpu>) {
340  // allocate pinned host memory accessible by the queue's platform
341  using Platform = alpaka::Platform<alpaka::Dev<Queue>>;
342  return alpaka::allocMappedBuf<Platform, std::byte, size_t>(device_, platform<Platform>(), bytes);
343  } else {
344  // unsupported combination
345  static_assert(std::is_same_v<Device, alpaka::Dev<Queue>> or std::is_same_v<Device, alpaka::DevCpu>,
346  "The \"memory device\" type can either be the same as the \"synchronisation device\" type, or be "
347  "the host CPU.");
348  }
349  }
350 
352  try {
353  block.buffer = allocateBuffer(block.bytes, *block.queue);
354  } catch (std::runtime_error const& e) {
355  // the allocation attempt failed: free all cached blocks on the device and retry
356  if (debug_) {
357  std::ostringstream out;
358  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " failed to allocate " << block.bytes
359  << " bytes for queue " << block.queue->m_spQueueImpl.get()
360  << ", retrying after freeing cached allocations" << std::endl;
361  std::cout << out.str() << std::endl;
362  }
363  // TODO implement a method that frees only up to block.bytes bytes
364  freeAllCached();
365 
366  // throw an exception if it fails again
367  block.buffer = allocateBuffer(block.bytes, *block.queue);
368  }
369 
370  // create a new event associated to the "synchronisation device"
371  block.event = Event{block.device()};
372 
373  {
374  std::scoped_lock lock(mutex_);
375  cachedBytes_.live += block.bytes;
376  cachedBytes_.requested += block.requested;
377  // TODO use std::move() ?
378  liveBlocks_[block.buffer->data()] = block;
379  }
380 
381  if (debug_) {
382  std::ostringstream out;
383  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " allocated new block at "
384  << block.buffer->data() << " (" << block.bytes << " bytes associated with queue "
385  << block.queue->m_spQueueImpl.get() << ", event " << block.event->m_spEventImpl.get() << "." << std::endl;
386  std::cout << out.str() << std::endl;
387  }
388  }
389 
390  void freeAllCached() {
391  std::scoped_lock lock(mutex_);
392 
393  while (not cachedBlocks_.empty()) {
394  auto iBlock = cachedBlocks_.begin();
395  cachedBytes_.free -= iBlock->second.bytes;
396 
397  if (debug_) {
398  std::ostringstream out;
399  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " freed " << iBlock->second.bytes
400  << " bytes.\n\t\t " << (cachedBlocks_.size() - 1) << " available blocks cached (" << cachedBytes_.free
401  << " bytes), " << liveBlocks_.size() << " live blocks (" << cachedBytes_.live << " bytes) outstanding."
402  << std::endl;
403  std::cout << out.str() << std::endl;
404  }
405 
406  cachedBlocks_.erase(iBlock);
407  }
408  }
409 
410  // TODO replace with a tbb::concurrent_multimap ?
411  using CachedBlocks = std::multimap<unsigned int, BlockDescriptor>; // ordered by the allocation bin
412  // TODO replace with a tbb::concurrent_map ?
413  using BusyBlocks = std::map<void*, BlockDescriptor>; // ordered by the address of the allocated memory
414 
415  inline static const std::string deviceType_ = alpaka::core::demangled<Device>;
416 
418  Device device_; // the device where the memory is allocated
419 
421  CachedBlocks cachedBlocks_; // Set of cached device allocations available for reuse
422  BusyBlocks liveBlocks_; // map of pointers to the live device allocations currently in use
423 
424  const unsigned int binGrowth_; // Geometric growth factor for bin-sizes
425  const unsigned int minBin_;
426  const unsigned int maxBin_;
427 
428  const size_t minBinBytes_;
429  const size_t maxBinBytes_;
430  const size_t maxCachedBytes_; // Maximum aggregate cached bytes per device
431 
433  const bool debug_;
434  };
435 
436 } // namespace cms::alpakatools
437 
438 #endif // HeterogeneousCore_AlpakaInterface_interface_CachingAllocator_h
std::map< void *, BlockDescriptor > BusyBlocks
constexpr unsigned int minBin
static std::mutex mutex
Definition: Proxy.cc:8
base
Main Program
Definition: newFWLiteAna.py:92
CachingAllocator(Device const &device, unsigned int binGrowth, unsigned int minBin, unsigned int maxBin, size_t maxCachedBytes, double maxCachedFraction, bool reuseSameQueueAllocations, bool debug)
std::string as_bytes(size_t value)
void * allocate(size_t bytes, Queue queue)
constexpr unsigned int maxBin
assert(be >=bs)
bool tryReuseCachedBlock(BlockDescriptor &block)
static std::string to_string(const XMLCh *ch)
constexpr double maxCachedFraction
Buffer allocateBuffer(size_t bytes, Queue const &queue)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
Definition: value.py:1
constexpr size_t maxCachedBytes
size_t cacheSize(size_t maxCachedBytes, double maxCachedFraction) const
constexpr unsigned int power(unsigned int base, unsigned int exponent)
#define debug
Definition: HDRShower.cc:19
std::tuple< unsigned int, size_t > findBin(size_t bytes) const
void allocateNewBlock(BlockDescriptor &block)
constexpr unsigned int binGrowth
std::string getName(const G4String &)
Definition: ForwardName.cc:3
alpaka::Buf< Device, std::byte, alpaka::DimInt< 1u >, size_t > Buffer
static const std::string deviceType_
def move(src, dest)
Definition: eostools.py:511
std::multimap< unsigned int, BlockDescriptor > CachedBlocks