CMS 3D CMS Logo

CachingAllocator.h
Go to the documentation of this file.
1 #ifndef HeterogeneousCore_AlpakaInterface_interface_CachingAllocator_h
2 #define HeterogeneousCore_AlpakaInterface_interface_CachingAllocator_h
3 
4 #include <cassert>
5 #include <exception>
6 #include <iomanip>
7 #include <iostream>
8 #include <map>
9 #include <mutex>
10 #include <optional>
11 #include <sstream>
12 #include <string>
13 #include <tuple>
14 #include <type_traits>
15 
16 #include <alpaka/alpaka.hpp>
17 
21 
22 // Inspired by cub::CachingDeviceAllocator
23 
24 namespace cms::alpakatools {
25 
26  namespace detail {
27 
28  inline constexpr unsigned int power(unsigned int base, unsigned int exponent) {
29  unsigned int power = 1;
30  while (exponent > 0) {
31  if (exponent & 1) {
32  power = power * base;
33  }
34  base = base * base;
35  exponent = exponent >> 1;
36  }
37  return power;
38  }
39 
40  // format a memory size in B/KiB/MiB/GiB/TiB
41  inline std::string as_bytes(size_t value) {
43  return "unlimited";
44  } else if (value >= (1ul << 40) and value % (1ul << 40) == 0) {
45  return std::to_string(value >> 40) + " TiB";
46  } else if (value >= (1ul << 30) and value % (1ul << 30) == 0) {
47  return std::to_string(value >> 30) + " GiB";
48  } else if (value >= (1ul << 20) and value % (1ul << 20) == 0) {
49  return std::to_string(value >> 20) + " MiB";
50  } else if (value >= (1ul << 10) and value % (1ul << 10) == 0) {
51  return std::to_string(value >> 10) + " KiB";
52  } else {
53  return std::to_string(value) + " B";
54  }
55  }
56 
57  } // namespace detail
58 
59  /*
60  * The "memory device" identifies the memory space, i.e. the device where the memory is allocated.
61  * A caching allocator object is associated to a single memory `Device`, set at construction time, and unchanged for
62  * the lifetime of the allocator.
63  *
64  * Each allocation is associated to an event on a queue, that identifies the "synchronisation device" according to
65  * which the synchronisation occurs.
66  * The `Event` type depends only on the synchronisation `Device` type.
67  * The `Queue` type depends on the synchronisation `Device` type and the queue properties, either `Sync` or `Async`.
68  *
69  * **Note**: how to handle different queue and event types in a single allocator ? store and access type-punned
70  * queues and events ? or template the internal structures on them, but with a common base class ?
71  * alpaka does rely on the compile-time type for dispatch.
72  *
73  * Common use case #1: accelerator's memory allocations
74  * - the "memory device" is the accelerator device (e.g. a GPU);
75  * - the "synchronisation device" is the same accelerator device;
76  * - the `Queue` type is usually always the same (either `Sync` or `Async`).
77  *
78  * Common use case #2: pinned host memory allocations
79  * - the "memory device" is the host device (e.g. system memory);
80  * - the "synchronisation device" is the accelerator device (e.g. a GPU) whose work queue will access the host;
81  * memory (direct memory access from the accelerator, or scheduling `alpaka::memcpy`/`alpaka::memset`), and can
82  * be different for each allocation;
83  * - the synchronisation `Device` _type_ could potentially be different, but memory pinning is currently tied to
84  * the accelerator's platform (CUDA, HIP, etc.), so the device type needs to be fixed to benefit from caching;
85  * - the `Queue` type can be either `Sync` _or_ `Async` on any allocation.
86  */
87 
88  template <typename TDev, typename TQueue>
90  public:
91 #ifdef ALPAKA_ACC_GPU_CUDA_ENABLED
92  friend class alpaka_cuda_async::AlpakaService;
93 #endif
94 #ifdef ALPAKA_ACC_GPU_HIP_ENABLED
95  friend class alpaka_rocm_async::AlpakaService;
96 #endif
97 #ifdef ALPAKA_ACC_CPU_B_SEQ_T_SEQ_ENABLED
98  friend class alpaka_serial_sync::AlpakaService;
99 #endif
100 #ifdef ALPAKA_ACC_CPU_B_TBB_T_SEQ_ENABLED
101  friend class alpaka_tbb_async::AlpakaService;
102 #endif
103 
104  using Device = TDev; // the "memory device", where the memory will be allocated
105  using Queue = TQueue; // the queue used to submit the memory operations
106  using Event = alpaka::Event<Queue>; // the events used to synchronise the operations
107  using Buffer = alpaka::Buf<Device, std::byte, alpaka::DimInt<1u>, size_t>;
108 
109  // The "memory device" type can either be the same as the "synchronisation device" type, or be the host CPU.
110  static_assert(alpaka::isDevice<Device>, "TDev should be an alpaka Device type.");
111  static_assert(alpaka::isQueue<Queue>, "TQueue should be an alpaka Queue type.");
112  static_assert(std::is_same_v<Device, alpaka::Dev<Queue>> or std::is_same_v<Device, alpaka::DevCpu>,
113  "The \"memory device\" type can either be the same as the \"synchronisation device\" type, or be the "
114  "host CPU.");
115 
116  struct CachedBytes {
117  size_t free = 0; // total bytes freed and cached on this device
118  size_t live = 0; // total bytes currently in use oin this device
119  size_t requested = 0; // total bytes requested and currently in use on this device
120  };
121 
123  Device const& device,
124  AllocatorConfig const& config,
125  bool reuseSameQueueAllocations, // Reuse non-ready allocations if they are in the same queue as the new one;
126  // this is safe only if all memory operations are scheduled in the same queue.
127  // In particular, this is not safe if the memory will be accessed without using
128  // any queue, like host memory accessed directly or with immediate operations.
129  bool debug = false)
130  : device_(device),
137  reuseSameQueueAllocations_(reuseSameQueueAllocations),
138  debug_(debug),
139  fillAllocations_(config.fillAllocations),
140  fillAllocationValue_(config.fillAllocationValue),
141  fillReallocations_(config.fillReallocations),
142  fillReallocationValue_(config.fillReallocationValue),
143  fillDeallocations_(config.fillDeallocations),
144  fillDeallocationValue_(config.fillDeallocationValue),
145  fillCaches_(config.fillCaches),
146  fillCacheValue_(config.fillCacheValue) {
147  if (debug_) {
148  std::ostringstream out;
149  out << "CachingAllocator settings\n"
150  << " bin growth " << binGrowth_ << "\n"
151  << " min bin " << minBin_ << "\n"
152  << " max bin " << maxBin_ << "\n"
153  << " resulting bins:\n";
154  for (auto bin = minBin_; bin <= maxBin_; ++bin) {
155  auto binSize = detail::power(binGrowth_, bin);
156  out << " " << std::right << std::setw(12) << detail::as_bytes(binSize) << '\n';
157  }
158  out << " maximum amount of cached memory: " << detail::as_bytes(maxCachedBytes_);
159  std::cout << out.str() << std::endl;
160  }
161  }
162 
164  {
165  // this should never be called while some memory blocks are still live
166  std::scoped_lock lock(mutex_);
167  assert(liveBlocks_.empty());
168  assert(cachedBytes_.live == 0);
169  }
170 
171  freeAllCached();
172  }
173 
174  // return a copy of the cache allocation status, for monitoring purposes
176  std::scoped_lock lock(mutex_);
177  return cachedBytes_;
178  }
179 
180  // Fill a memory buffer with the specified bye value.
181  // If the underlying device is the host and the allocator is configured to support immediate
182  // (non queue-ordered) operations, fill the memory synchronously using std::memset.
183  // Otherwise, let the alpaka queue schedule the operation.
184  //
185  // This is not used for deallocation/caching, because the memory may still be in use until the
186  // corresponding event is reached.
188  // host-only
189  if (std::is_same_v<Device, alpaka::DevCpu> and not reuseSameQueueAllocations_) {
190  std::memset(buffer.data(), value, alpaka::getExtentProduct(buffer) * sizeof(alpaka::Elem<Buffer>));
191  } else {
192  alpaka::memset(queue, buffer, value);
193  }
194  }
195 
196  // Allocate given number of bytes on the current device associated to given queue
197  void* allocate(size_t bytes, Queue queue) {
198  // create a block descriptor for the requested allocation
200  block.queue = std::move(queue);
201  block.requested = bytes;
202  std::tie(block.bin, block.bytes) = findBin(bytes);
203 
204  // try to re-use a cached block, or allocate a new buffer
205  if (tryReuseCachedBlock(block)) {
206  // fill the re-used memory block with a pattern
207  if (fillReallocations_) {
209  } else if (fillAllocations_) {
211  }
212  } else {
214  // fill the newly allocated memory block with a pattern
215  if (fillAllocations_) {
217  }
218  }
219 
220  return block.buffer->data();
221  }
222 
223  // frees an allocation
224  void free(void* ptr) {
225  std::scoped_lock lock(mutex_);
226 
227  auto iBlock = liveBlocks_.find(ptr);
228  if (iBlock == liveBlocks_.end()) {
229  std::stringstream ss;
230  ss << "Trying to free a non-live block at " << ptr;
231  throw std::runtime_error(ss.str());
232  }
233  // remove the block from the list of live blocks
234  BlockDescriptor block = std::move(iBlock->second);
235  liveBlocks_.erase(iBlock);
236  cachedBytes_.live -= block.bytes;
237  cachedBytes_.requested -= block.requested;
238 
239  bool recache = (cachedBytes_.free + block.bytes <= maxCachedBytes_);
240  if (recache) {
241  // If enqueuing the event fails, very likely an error has
242  // occurred in the asynchronous processing. In that case the
243  // error will show up in all device API function calls, and
244  // the free() will be called by destructors during stack
245  // unwinding. In order to avoid terminate() being called
246  // because of multiple exceptions it is best to ignore these
247  // errors.
248  try {
249  // fill memory blocks with a pattern before caching them
250  if (fillCaches_) {
251  alpaka::memset(*block.queue, *block.buffer, fillCacheValue_);
252  } else if (fillDeallocations_) {
253  alpaka::memset(*block.queue, *block.buffer, fillDeallocationValue_);
254  }
255  // record in the block a marker associated to the work queue
256  alpaka::enqueue(*(block.queue), *(block.event));
257  } catch (std::exception& e) {
258  if (debug_) {
259  std::ostringstream out;
260  out << "CachingAllocator::free() caught an alpaka error: " << e.what() << "\n";
261  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " freed " << block.bytes << " bytes at "
262  << ptr << " from associated queue " << block.queue->m_spQueueImpl.get() << ", event "
263  << block.event->m_spEventImpl.get() << " .\n\t\t " << cachedBlocks_.size()
264  << " available blocks cached (" << cachedBytes_.free << " bytes), " << liveBlocks_.size()
265  << " live blocks (" << cachedBytes_.live << " bytes) outstanding." << std::endl;
266  std::cout << out.str() << std::endl;
267  }
268  return;
269  }
270  cachedBytes_.free += block.bytes;
271  // after the call to insert(), cachedBlocks_ shares ownership of the buffer
272  // TODO use std::move ?
273  cachedBlocks_.insert(std::make_pair(block.bin, block));
274 
275  if (debug_) {
276  std::ostringstream out;
277  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " returned " << block.bytes << " bytes at "
278  << ptr << " from associated queue " << block.queue->m_spQueueImpl.get() << " , event "
279  << block.event->m_spEventImpl.get() << " .\n\t\t " << cachedBlocks_.size() << " available blocks cached ("
280  << cachedBytes_.free << " bytes), " << liveBlocks_.size() << " live blocks (" << cachedBytes_.live
281  << " bytes) outstanding." << std::endl;
282  std::cout << out.str() << std::endl;
283  }
284  } else {
285  // If the memset fails, very likely an error has occurred in the
286  // asynchronous processing. In that case the error will show up in all
287  // device API function calls, and the free() will be called by
288  // destructors during stack unwinding. In order to avoid terminate()
289  // being called because of multiple exceptions it is best to ignore
290  // these errors.
291  try {
292  // fill memory blocks with a pattern before freeing them
293  if (fillDeallocations_) {
294  alpaka::memset(*block.queue, *block.buffer, fillDeallocationValue_);
295  }
296  } catch (std::exception& e) {
297  if (debug_) {
298  std::ostringstream out;
299  out << "CachingAllocator::free() caught an alpaka error: " << e.what() << "\n";
300  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " freed " << block.bytes << " bytes at "
301  << ptr << " from associated queue " << block.queue->m_spQueueImpl.get() << ", event "
302  << block.event->m_spEventImpl.get() << " .\n\t\t " << cachedBlocks_.size()
303  << " available blocks cached (" << cachedBytes_.free << " bytes), " << liveBlocks_.size()
304  << " live blocks (" << cachedBytes_.live << " bytes) outstanding." << std::endl;
305  std::cout << out.str() << std::endl;
306  }
307  return;
308  }
309  // if the buffer is not recached, it is automatically freed when block goes out of scope
310  if (debug_) {
311  std::ostringstream out;
312  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " freed " << block.bytes << " bytes at "
313  << ptr << " from associated queue " << block.queue->m_spQueueImpl.get() << ", event "
314  << block.event->m_spEventImpl.get() << " .\n\t\t " << cachedBlocks_.size() << " available blocks cached ("
315  << cachedBytes_.free << " bytes), " << liveBlocks_.size() << " live blocks (" << cachedBytes_.live
316  << " bytes) outstanding." << std::endl;
317  std::cout << out.str() << std::endl;
318  }
319  }
320  }
321 
322  private:
324  std::optional<Buffer> buffer;
325  std::optional<Queue> queue;
326  std::optional<Event> event;
327  size_t bytes = 0;
328  size_t requested = 0; // for monitoring only
329  unsigned int bin = 0;
330 
331  // the "synchronisation device" for this block
332  auto device() { return alpaka::getDev(*queue); }
333  };
334 
335  private:
336  // return the maximum amount of memory that should be cached on this device
337  size_t cacheSize(size_t maxCachedBytes, double maxCachedFraction) const {
338  // note that getMemBytes() returns 0 if the platform does not support querying the device memory
339  size_t totalMemory = alpaka::getMemBytes(device_);
340  size_t memoryFraction = static_cast<size_t>(maxCachedFraction * totalMemory);
342  if (maxCachedBytes > 0 and maxCachedBytes < size) {
344  }
345  if (memoryFraction > 0 and memoryFraction < size) {
346  size = memoryFraction;
347  }
348  return size;
349  }
350 
351  // return (bin, bin size)
352  std::tuple<unsigned int, size_t> findBin(size_t bytes) const {
353  if (bytes < minBinBytes_) {
354  return std::make_tuple(minBin_, minBinBytes_);
355  }
356  if (bytes > maxBinBytes_) {
357  throw std::runtime_error("Requested allocation size " + std::to_string(bytes) +
358  " bytes is too large for the caching detail with maximum bin " +
360  " bytes. You might want to increase the maximum bin size");
361  }
362  unsigned int bin = minBin_;
363  size_t binBytes = minBinBytes_;
364  while (binBytes < bytes) {
365  ++bin;
366  binBytes *= binGrowth_;
367  }
368  return std::make_tuple(bin, binBytes);
369  }
370 
372  std::scoped_lock lock(mutex_);
373 
374  // iterate through the range of cached blocks in the same bin
375  const auto [begin, end] = cachedBlocks_.equal_range(block.bin);
376  for (auto iBlock = begin; iBlock != end; ++iBlock) {
377  if ((reuseSameQueueAllocations_ and (*block.queue == *(iBlock->second.queue))) or
378  alpaka::isComplete(*(iBlock->second.event))) {
379  // associate the cached buffer to the new queue
380  auto queue = std::move(*(block.queue));
381  // TODO cache (or remove) the debug information and use std::move()
382  block = iBlock->second;
383  block.queue = std::move(queue);
384 
385  // if the new queue is on different device than the old event, create a new event
386  if (block.device() != alpaka::getDev(*(block.event))) {
387  block.event = Event{block.device()};
388  }
389 
390  // insert the cached block into the live blocks
391  // TODO cache (or remove) the debug information and use std::move()
392  liveBlocks_[block.buffer->data()] = block;
393 
394  // update the accounting information
395  cachedBytes_.free -= block.bytes;
396  cachedBytes_.live += block.bytes;
397  cachedBytes_.requested += block.requested;
398 
399  if (debug_) {
400  std::ostringstream out;
401  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " reused cached block at "
402  << block.buffer->data() << " (" << block.bytes << " bytes) for queue "
403  << block.queue->m_spQueueImpl.get() << ", event " << block.event->m_spEventImpl.get()
404  << " (previously associated with queue " << iBlock->second.queue->m_spQueueImpl.get() << " , event "
405  << iBlock->second.event->m_spEventImpl.get() << ")." << std::endl;
406  std::cout << out.str() << std::endl;
407  }
408 
409  // remove the reused block from the list of cached blocks
410  cachedBlocks_.erase(iBlock);
411  return true;
412  }
413  }
414 
415  return false;
416  }
417 
418  Buffer allocateBuffer(size_t bytes, Queue const& queue) {
419  if constexpr (std::is_same_v<Device, alpaka::Dev<Queue>>) {
420  // allocate device memory
421  return alpaka::allocBuf<std::byte, size_t>(device_, bytes);
422  } else if constexpr (std::is_same_v<Device, alpaka::DevCpu>) {
423  // allocate pinned host memory accessible by the queue's platform
424  using Platform = alpaka::Platform<alpaka::Dev<Queue>>;
425  return alpaka::allocMappedBuf<Platform, std::byte, size_t>(device_, platform<Platform>(), bytes);
426  } else {
427  // unsupported combination
428  static_assert(std::is_same_v<Device, alpaka::Dev<Queue>> or std::is_same_v<Device, alpaka::DevCpu>,
429  "The \"memory device\" type can either be the same as the \"synchronisation device\" type, or be "
430  "the host CPU.");
431  }
432  }
433 
435  try {
436  block.buffer = allocateBuffer(block.bytes, *block.queue);
437  } catch (std::runtime_error const& e) {
438  // the allocation attempt failed: free all cached blocks on the device and retry
439  if (debug_) {
440  std::ostringstream out;
441  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " failed to allocate " << block.bytes
442  << " bytes for queue " << block.queue->m_spQueueImpl.get()
443  << ", retrying after freeing cached allocations" << std::endl;
444  std::cout << out.str() << std::endl;
445  }
446  // TODO implement a method that frees only up to block.bytes bytes
447  freeAllCached();
448 
449  // throw an exception if it fails again
450  block.buffer = allocateBuffer(block.bytes, *block.queue);
451  }
452 
453  // create a new event associated to the "synchronisation device"
454  block.event = Event{block.device()};
455 
456  {
457  std::scoped_lock lock(mutex_);
458  cachedBytes_.live += block.bytes;
459  cachedBytes_.requested += block.requested;
460  // TODO use std::move() ?
461  liveBlocks_[block.buffer->data()] = block;
462  }
463 
464  if (debug_) {
465  std::ostringstream out;
466  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " allocated new block at "
467  << block.buffer->data() << " (" << block.bytes << " bytes associated with queue "
468  << block.queue->m_spQueueImpl.get() << ", event " << block.event->m_spEventImpl.get() << "." << std::endl;
469  std::cout << out.str() << std::endl;
470  }
471  }
472 
473  void freeAllCached() {
474  std::scoped_lock lock(mutex_);
475 
476  while (not cachedBlocks_.empty()) {
477  auto iBlock = cachedBlocks_.begin();
478  cachedBytes_.free -= iBlock->second.bytes;
479 
480  if (debug_) {
481  std::ostringstream out;
482  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " freed " << iBlock->second.bytes
483  << " bytes.\n\t\t " << (cachedBlocks_.size() - 1) << " available blocks cached (" << cachedBytes_.free
484  << " bytes), " << liveBlocks_.size() << " live blocks (" << cachedBytes_.live << " bytes) outstanding."
485  << std::endl;
486  std::cout << out.str() << std::endl;
487  }
488 
489  cachedBlocks_.erase(iBlock);
490  }
491  }
492 
493  // TODO replace with a tbb::concurrent_multimap ?
494  using CachedBlocks = std::multimap<unsigned int, BlockDescriptor>; // ordered by the allocation bin
495  // TODO replace with a tbb::concurrent_map ?
496  using BusyBlocks = std::map<void*, BlockDescriptor>; // ordered by the address of the allocated memory
497 
498  inline static const std::string deviceType_ = alpaka::core::demangled<Device>;
499 
501  Device device_; // the device where the memory is allocated
502 
504  CachedBlocks cachedBlocks_; // Set of cached device allocations available for reuse
505  BusyBlocks liveBlocks_; // map of pointers to the live device allocations currently in use
506 
507  const unsigned int binGrowth_; // Geometric growth factor for bin-sizes
508  const unsigned int minBin_;
509  const unsigned int maxBin_;
510 
511  const size_t minBinBytes_;
512  const size_t maxBinBytes_;
513  const size_t maxCachedBytes_; // Maximum aggregate cached bytes per device
514 
516  const bool debug_;
517 
518  const bool fillAllocations_;
519  const uint8_t fillAllocationValue_;
520  const bool fillReallocations_;
521  const uint8_t fillReallocationValue_;
522  const bool fillDeallocations_;
523  const uint8_t fillDeallocationValue_;
524  const bool fillCaches_;
525  const uint8_t fillCacheValue_;
526  };
527 
528 } // namespace cms::alpakatools
529 
530 #endif // HeterogeneousCore_AlpakaInterface_interface_CachingAllocator_h
void immediateOrAsyncMemset(Queue queue, Buffer buffer, uint8_t value)
std::map< void *, BlockDescriptor > BusyBlocks
constexpr unsigned int maxBin
constexpr size_t maxCachedBytes
static std::mutex mutex
Definition: Proxy.cc:8
std::string as_bytes(size_t value)
void * allocate(size_t bytes, Queue queue)
Definition: config.py:1
assert(be >=bs)
bool tryReuseCachedBlock(BlockDescriptor &block)
static std::string to_string(const XMLCh *ch)
CachingAllocator(Device const &device, AllocatorConfig const &config, bool reuseSameQueueAllocations, bool debug=false)
Buffer allocateBuffer(size_t bytes, Queue const &queue)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
Definition: value.py:1
size_t cacheSize(size_t maxCachedBytes, double maxCachedFraction) const
constexpr unsigned int power(unsigned int base, unsigned int exponent)
#define debug
Definition: HDRShower.cc:19
std::tuple< unsigned int, size_t > findBin(size_t bytes) const
void allocateNewBlock(BlockDescriptor &block)
std::string getName(const G4String &)
Definition: ForwardName.cc:3
alpaka::Buf< Device, std::byte, alpaka::DimInt< 1u >, size_t > Buffer
static const std::string deviceType_
constexpr unsigned int binGrowth
def move(src, dest)
Definition: eostools.py:511
std::multimap< unsigned int, BlockDescriptor > CachedBlocks
constexpr unsigned int minBin
constexpr double maxCachedFraction