CMS 3D CMS Logo

CachingAllocator.h
Go to the documentation of this file.
1 #ifndef HeterogeneousCore_AlpakaInterface_interface_CachingAllocator_h
2 #define HeterogeneousCore_AlpakaInterface_interface_CachingAllocator_h
3 
4 #include <cassert>
5 #include <exception>
6 #include <iomanip>
7 #include <iostream>
8 #include <map>
9 #include <mutex>
10 #include <optional>
11 #include <sstream>
12 #include <string>
13 #include <tuple>
14 #include <type_traits>
15 
16 #include <alpaka/alpaka.hpp>
17 
21 
22 // Inspired by cub::CachingDeviceAllocator
23 
24 namespace cms::alpakatools {
25 
26  namespace detail {
27 
28  inline constexpr unsigned int power(unsigned int base, unsigned int exponent) {
29  unsigned int power = 1;
30  while (exponent > 0) {
31  if (exponent & 1) {
32  power = power * base;
33  }
34  base = base * base;
35  exponent = exponent >> 1;
36  }
37  return power;
38  }
39 
40  // format a memory size in B/KiB/MiB/GiB/TiB
41  inline std::string as_bytes(size_t value) {
43  return "unlimited";
44  } else if (value >= (1ul << 40) and value % (1ul << 40) == 0) {
45  return std::to_string(value >> 40) + " TiB";
46  } else if (value >= (1ul << 30) and value % (1ul << 30) == 0) {
47  return std::to_string(value >> 30) + " GiB";
48  } else if (value >= (1ul << 20) and value % (1ul << 20) == 0) {
49  return std::to_string(value >> 20) + " MiB";
50  } else if (value >= (1ul << 10) and value % (1ul << 10) == 0) {
51  return std::to_string(value >> 10) + " KiB";
52  } else {
53  return std::to_string(value) + " B";
54  }
55  }
56 
57  } // namespace detail
58 
59  /*
60  * The "memory device" identifies the memory space, i.e. the device where the memory is allocated.
61  * A caching allocator object is associated to a single memory `Device`, set at construction time, and unchanged for
62  * the lifetime of the allocator.
63  *
64  * Each allocation is associated to an event on a queue, that identifies the "synchronisation device" according to
65  * which the synchronisation occurs.
66  * The `Event` type depends only on the synchronisation `Device` type.
67  * The `Queue` type depends on the synchronisation `Device` type and the queue properties, either `Sync` or `Async`.
68  *
69  * **Note**: how to handle different queue and event types in a single allocator ? store and access type-punned
70  * queues and events ? or template the internal structures on them, but with a common base class ?
71  * alpaka does rely on the compile-time type for dispatch.
72  *
73  * Common use case #1: accelerator's memory allocations
74  * - the "memory device" is the accelerator device (e.g. a GPU);
75  * - the "synchronisation device" is the same accelerator device;
76  * - the `Queue` type is usually always the same (either `Sync` or `Async`).
77  *
78  * Common use case #2: pinned host memory allocations
79  * - the "memory device" is the host device (e.g. system memory);
80  * - the "synchronisation device" is the accelerator device (e.g. a GPU) whose work queue will access the host;
81  * memory (direct memory access from the accelerator, or scheduling `alpaka::memcpy`/`alpaka::memset`), and can
82  * be different for each allocation;
83  * - the synchronisation `Device` _type_ could potentially be different, but memory pinning is currently tied to
84  * the accelerator's platform (CUDA, HIP, etc.), so the device type needs to be fixed to benefit from caching;
85  * - the `Queue` type can be either `Sync` _or_ `Async` on any allocation.
86  */
87 
88  template <typename TDev, typename TQueue>
90  public:
91 #ifdef ALPAKA_ACC_GPU_CUDA_ENABLED
92  friend class alpaka_cuda_async::AlpakaService;
93 #endif
94 #ifdef ALPAKA_ACC_GPU_HIP_ENABLED
95  friend class alpaka_rocm_async::AlpakaService;
96 #endif
97 #ifdef ALPAKA_ACC_CPU_B_SEQ_T_SEQ_ENABLED
98  friend class alpaka_serial_sync::AlpakaService;
99 #endif
100 #ifdef ALPAKA_ACC_CPU_B_TBB_T_SEQ_ENABLED
101  friend class alpaka_tbb_async::AlpakaService;
102 #endif
103 
104  using Device = TDev; // the "memory device", where the memory will be allocated
105  using Queue = TQueue; // the queue used to submit the memory operations
106  using Event = alpaka::Event<Queue>; // the events used to synchronise the operations
107  using Buffer = alpaka::Buf<Device, std::byte, alpaka::DimInt<1u>, size_t>;
108 
109  // The "memory device" type can either be the same as the "synchronisation device" type, or be the host CPU.
110  static_assert(alpaka::isDevice<Device>, "TDev should be an alpaka Device type.");
111  static_assert(alpaka::isQueue<Queue>, "TQueue should be an alpaka Queue type.");
112  static_assert(std::is_same_v<Device, alpaka::Dev<Queue>> or std::is_same_v<Device, alpaka::DevCpu>,
113  "The \"memory device\" type can either be the same as the \"synchronisation device\" type, or be the "
114  "host CPU.");
115 
116  struct CachedBytes {
117  size_t free = 0; // total bytes freed and cached on this device
118  size_t live = 0; // total bytes currently in use oin this device
119  size_t requested = 0; // total bytes requested and currently in use on this device
120  };
121 
123  Device const& device,
124  AllocatorConfig const& config,
125  bool reuseSameQueueAllocations, // reuse non-ready allocations if they are in the same queue as the new one;
126  // this is safe only if all memory operations are scheduled in the same queue
127  bool debug = false)
128  : device_(device),
135  reuseSameQueueAllocations_(reuseSameQueueAllocations),
136  debug_(debug),
137  fillAllocations_(config.fillAllocations),
138  fillAllocationValue_(config.fillAllocationValue),
139  fillReallocations_(config.fillReallocations),
140  fillReallocationValue_(config.fillReallocationValue),
141  fillDeallocations_(config.fillDeallocations),
142  fillDeallocationValue_(config.fillDeallocationValue),
143  fillCaches_(config.fillCaches),
144  fillCacheValue_(config.fillCacheValue) {
145  if (debug_) {
146  std::ostringstream out;
147  out << "CachingAllocator settings\n"
148  << " bin growth " << binGrowth_ << "\n"
149  << " min bin " << minBin_ << "\n"
150  << " max bin " << maxBin_ << "\n"
151  << " resulting bins:\n";
152  for (auto bin = minBin_; bin <= maxBin_; ++bin) {
153  auto binSize = detail::power(binGrowth_, bin);
154  out << " " << std::right << std::setw(12) << detail::as_bytes(binSize) << '\n';
155  }
156  out << " maximum amount of cached memory: " << detail::as_bytes(maxCachedBytes_);
157  std::cout << out.str() << std::endl;
158  }
159  }
160 
162  {
163  // this should never be called while some memory blocks are still live
164  std::scoped_lock lock(mutex_);
165  assert(liveBlocks_.empty());
166  assert(cachedBytes_.live == 0);
167  }
168 
169  freeAllCached();
170  }
171 
172  // return a copy of the cache allocation status, for monitoring purposes
174  std::scoped_lock lock(mutex_);
175  return cachedBytes_;
176  }
177 
178  // Allocate given number of bytes on the current device associated to given queue
179  void* allocate(size_t bytes, Queue queue) {
180  // create a block descriptor for the requested allocation
182  block.queue = std::move(queue);
183  block.requested = bytes;
184  std::tie(block.bin, block.bytes) = findBin(bytes);
185 
186  // try to re-use a cached block, or allocate a new buffer
187  if (tryReuseCachedBlock(block)) {
188  // fill the re-used memory block with a pattern
189  if (fillReallocations_) {
190  alpaka::memset(*block.queue, *block.buffer, fillReallocationValue_);
191  } else if (fillAllocations_) {
192  alpaka::memset(*block.queue, *block.buffer, fillAllocationValue_);
193  }
194  } else {
196  // fill the newly allocated memory block with a pattern
197  if (fillAllocations_) {
198  alpaka::memset(*block.queue, *block.buffer, fillAllocationValue_);
199  }
200  }
201 
202  return block.buffer->data();
203  }
204 
205  // frees an allocation
206  void free(void* ptr) {
207  std::scoped_lock lock(mutex_);
208 
209  auto iBlock = liveBlocks_.find(ptr);
210  if (iBlock == liveBlocks_.end()) {
211  std::stringstream ss;
212  ss << "Trying to free a non-live block at " << ptr;
213  throw std::runtime_error(ss.str());
214  }
215  // remove the block from the list of live blocks
216  BlockDescriptor block = std::move(iBlock->second);
217  liveBlocks_.erase(iBlock);
218  cachedBytes_.live -= block.bytes;
219  cachedBytes_.requested -= block.requested;
220 
221  bool recache = (cachedBytes_.free + block.bytes <= maxCachedBytes_);
222  if (recache) {
223  // If enqueuing the event fails, very likely an error has
224  // occurred in the asynchronous processing. In that case the
225  // error will show up in all device API function calls, and
226  // the free() will be called by destructors during stack
227  // unwinding. In order to avoid terminate() being called
228  // because of multiple exceptions it is best to ignore these
229  // errors.
230  try {
231  // fill memory blocks with a pattern before caching them
232  if (fillCaches_) {
233  alpaka::memset(*block.queue, *block.buffer, fillCacheValue_);
234  } else if (fillDeallocations_) {
235  alpaka::memset(*block.queue, *block.buffer, fillDeallocationValue_);
236  }
237  // record in the block a marker associated to the work queue
238  alpaka::enqueue(*(block.queue), *(block.event));
239  } catch (std::exception& e) {
240  if (debug_) {
241  std::ostringstream out;
242  out << "CachingAllocator::free() caught an alpaka error: " << e.what() << "\n";
243  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " freed " << block.bytes << " bytes at "
244  << ptr << " from associated queue " << block.queue->m_spQueueImpl.get() << ", event "
245  << block.event->m_spEventImpl.get() << " .\n\t\t " << cachedBlocks_.size()
246  << " available blocks cached (" << cachedBytes_.free << " bytes), " << liveBlocks_.size()
247  << " live blocks (" << cachedBytes_.live << " bytes) outstanding." << std::endl;
248  std::cout << out.str() << std::endl;
249  }
250  return;
251  }
252  cachedBytes_.free += block.bytes;
253  // after the call to insert(), cachedBlocks_ shares ownership of the buffer
254  // TODO use std::move ?
255  cachedBlocks_.insert(std::make_pair(block.bin, block));
256 
257  if (debug_) {
258  std::ostringstream out;
259  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " returned " << block.bytes << " bytes at "
260  << ptr << " from associated queue " << block.queue->m_spQueueImpl.get() << " , event "
261  << block.event->m_spEventImpl.get() << " .\n\t\t " << cachedBlocks_.size() << " available blocks cached ("
262  << cachedBytes_.free << " bytes), " << liveBlocks_.size() << " live blocks (" << cachedBytes_.live
263  << " bytes) outstanding." << std::endl;
264  std::cout << out.str() << std::endl;
265  }
266  } else {
267  // If the memset fails, very likely an error has occurred in the
268  // asynchronous processing. In that case the error will show up in all
269  // device API function calls, and the free() will be called by
270  // destructors during stack unwinding. In order to avoid terminate()
271  // being called because of multiple exceptions it is best to ignore
272  // these errors.
273  try {
274  // fill memory blocks with a pattern before freeing them
275  if (fillDeallocations_) {
276  alpaka::memset(*block.queue, *block.buffer, fillDeallocationValue_);
277  }
278  } catch (std::exception& e) {
279  if (debug_) {
280  std::ostringstream out;
281  out << "CachingAllocator::free() caught an alpaka error: " << e.what() << "\n";
282  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " freed " << block.bytes << " bytes at "
283  << ptr << " from associated queue " << block.queue->m_spQueueImpl.get() << ", event "
284  << block.event->m_spEventImpl.get() << " .\n\t\t " << cachedBlocks_.size()
285  << " available blocks cached (" << cachedBytes_.free << " bytes), " << liveBlocks_.size()
286  << " live blocks (" << cachedBytes_.live << " bytes) outstanding." << std::endl;
287  std::cout << out.str() << std::endl;
288  }
289  return;
290  }
291  // if the buffer is not recached, it is automatically freed when block goes out of scope
292  if (debug_) {
293  std::ostringstream out;
294  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " freed " << block.bytes << " bytes at "
295  << ptr << " from associated queue " << block.queue->m_spQueueImpl.get() << ", event "
296  << block.event->m_spEventImpl.get() << " .\n\t\t " << cachedBlocks_.size() << " available blocks cached ("
297  << cachedBytes_.free << " bytes), " << liveBlocks_.size() << " live blocks (" << cachedBytes_.live
298  << " bytes) outstanding." << std::endl;
299  std::cout << out.str() << std::endl;
300  }
301  }
302  }
303 
304  private:
306  std::optional<Buffer> buffer;
307  std::optional<Queue> queue;
308  std::optional<Event> event;
309  size_t bytes = 0;
310  size_t requested = 0; // for monitoring only
311  unsigned int bin = 0;
312 
313  // the "synchronisation device" for this block
314  auto device() { return alpaka::getDev(*queue); }
315  };
316 
317  private:
318  // return the maximum amount of memory that should be cached on this device
319  size_t cacheSize(size_t maxCachedBytes, double maxCachedFraction) const {
320  // note that getMemBytes() returns 0 if the platform does not support querying the device memory
321  size_t totalMemory = alpaka::getMemBytes(device_);
322  size_t memoryFraction = static_cast<size_t>(maxCachedFraction * totalMemory);
324  if (maxCachedBytes > 0 and maxCachedBytes < size) {
326  }
327  if (memoryFraction > 0 and memoryFraction < size) {
328  size = memoryFraction;
329  }
330  return size;
331  }
332 
333  // return (bin, bin size)
334  std::tuple<unsigned int, size_t> findBin(size_t bytes) const {
335  if (bytes < minBinBytes_) {
336  return std::make_tuple(minBin_, minBinBytes_);
337  }
338  if (bytes > maxBinBytes_) {
339  throw std::runtime_error("Requested allocation size " + std::to_string(bytes) +
340  " bytes is too large for the caching detail with maximum bin " +
342  " bytes. You might want to increase the maximum bin size");
343  }
344  unsigned int bin = minBin_;
345  size_t binBytes = minBinBytes_;
346  while (binBytes < bytes) {
347  ++bin;
348  binBytes *= binGrowth_;
349  }
350  return std::make_tuple(bin, binBytes);
351  }
352 
354  std::scoped_lock lock(mutex_);
355 
356  // iterate through the range of cached blocks in the same bin
357  const auto [begin, end] = cachedBlocks_.equal_range(block.bin);
358  for (auto iBlock = begin; iBlock != end; ++iBlock) {
359  if ((reuseSameQueueAllocations_ and (*block.queue == *(iBlock->second.queue))) or
360  alpaka::isComplete(*(iBlock->second.event))) {
361  // associate the cached buffer to the new queue
362  auto queue = std::move(*(block.queue));
363  // TODO cache (or remove) the debug information and use std::move()
364  block = iBlock->second;
365  block.queue = std::move(queue);
366 
367  // if the new queue is on different device than the old event, create a new event
368  if (block.device() != alpaka::getDev(*(block.event))) {
369  block.event = Event{block.device()};
370  }
371 
372  // insert the cached block into the live blocks
373  // TODO cache (or remove) the debug information and use std::move()
374  liveBlocks_[block.buffer->data()] = block;
375 
376  // update the accounting information
377  cachedBytes_.free -= block.bytes;
378  cachedBytes_.live += block.bytes;
379  cachedBytes_.requested += block.requested;
380 
381  if (debug_) {
382  std::ostringstream out;
383  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " reused cached block at "
384  << block.buffer->data() << " (" << block.bytes << " bytes) for queue "
385  << block.queue->m_spQueueImpl.get() << ", event " << block.event->m_spEventImpl.get()
386  << " (previously associated with queue " << iBlock->second.queue->m_spQueueImpl.get() << " , event "
387  << iBlock->second.event->m_spEventImpl.get() << ")." << std::endl;
388  std::cout << out.str() << std::endl;
389  }
390 
391  // remove the reused block from the list of cached blocks
392  cachedBlocks_.erase(iBlock);
393  return true;
394  }
395  }
396 
397  return false;
398  }
399 
400  Buffer allocateBuffer(size_t bytes, Queue const& queue) {
401  if constexpr (std::is_same_v<Device, alpaka::Dev<Queue>>) {
402  // allocate device memory
403  return alpaka::allocBuf<std::byte, size_t>(device_, bytes);
404  } else if constexpr (std::is_same_v<Device, alpaka::DevCpu>) {
405  // allocate pinned host memory accessible by the queue's platform
406  using Platform = alpaka::Platform<alpaka::Dev<Queue>>;
407  return alpaka::allocMappedBuf<Platform, std::byte, size_t>(device_, platform<Platform>(), bytes);
408  } else {
409  // unsupported combination
410  static_assert(std::is_same_v<Device, alpaka::Dev<Queue>> or std::is_same_v<Device, alpaka::DevCpu>,
411  "The \"memory device\" type can either be the same as the \"synchronisation device\" type, or be "
412  "the host CPU.");
413  }
414  }
415 
417  try {
418  block.buffer = allocateBuffer(block.bytes, *block.queue);
419  } catch (std::runtime_error const& e) {
420  // the allocation attempt failed: free all cached blocks on the device and retry
421  if (debug_) {
422  std::ostringstream out;
423  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " failed to allocate " << block.bytes
424  << " bytes for queue " << block.queue->m_spQueueImpl.get()
425  << ", retrying after freeing cached allocations" << std::endl;
426  std::cout << out.str() << std::endl;
427  }
428  // TODO implement a method that frees only up to block.bytes bytes
429  freeAllCached();
430 
431  // throw an exception if it fails again
432  block.buffer = allocateBuffer(block.bytes, *block.queue);
433  }
434 
435  // create a new event associated to the "synchronisation device"
436  block.event = Event{block.device()};
437 
438  {
439  std::scoped_lock lock(mutex_);
440  cachedBytes_.live += block.bytes;
441  cachedBytes_.requested += block.requested;
442  // TODO use std::move() ?
443  liveBlocks_[block.buffer->data()] = block;
444  }
445 
446  if (debug_) {
447  std::ostringstream out;
448  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " allocated new block at "
449  << block.buffer->data() << " (" << block.bytes << " bytes associated with queue "
450  << block.queue->m_spQueueImpl.get() << ", event " << block.event->m_spEventImpl.get() << "." << std::endl;
451  std::cout << out.str() << std::endl;
452  }
453  }
454 
455  void freeAllCached() {
456  std::scoped_lock lock(mutex_);
457 
458  while (not cachedBlocks_.empty()) {
459  auto iBlock = cachedBlocks_.begin();
460  cachedBytes_.free -= iBlock->second.bytes;
461 
462  if (debug_) {
463  std::ostringstream out;
464  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " freed " << iBlock->second.bytes
465  << " bytes.\n\t\t " << (cachedBlocks_.size() - 1) << " available blocks cached (" << cachedBytes_.free
466  << " bytes), " << liveBlocks_.size() << " live blocks (" << cachedBytes_.live << " bytes) outstanding."
467  << std::endl;
468  std::cout << out.str() << std::endl;
469  }
470 
471  cachedBlocks_.erase(iBlock);
472  }
473  }
474 
475  // TODO replace with a tbb::concurrent_multimap ?
476  using CachedBlocks = std::multimap<unsigned int, BlockDescriptor>; // ordered by the allocation bin
477  // TODO replace with a tbb::concurrent_map ?
478  using BusyBlocks = std::map<void*, BlockDescriptor>; // ordered by the address of the allocated memory
479 
480  inline static const std::string deviceType_ = alpaka::core::demangled<Device>;
481 
483  Device device_; // the device where the memory is allocated
484 
486  CachedBlocks cachedBlocks_; // Set of cached device allocations available for reuse
487  BusyBlocks liveBlocks_; // map of pointers to the live device allocations currently in use
488 
489  const unsigned int binGrowth_; // Geometric growth factor for bin-sizes
490  const unsigned int minBin_;
491  const unsigned int maxBin_;
492 
493  const size_t minBinBytes_;
494  const size_t maxBinBytes_;
495  const size_t maxCachedBytes_; // Maximum aggregate cached bytes per device
496 
498  const bool debug_;
499 
500  const bool fillAllocations_;
501  const uint8_t fillAllocationValue_;
502  const bool fillReallocations_;
503  const uint8_t fillReallocationValue_;
504  const bool fillDeallocations_;
505  const uint8_t fillDeallocationValue_;
506  const bool fillCaches_;
507  const uint8_t fillCacheValue_;
508  };
509 
510 } // namespace cms::alpakatools
511 
512 #endif // HeterogeneousCore_AlpakaInterface_interface_CachingAllocator_h
std::map< void *, BlockDescriptor > BusyBlocks
constexpr unsigned int maxBin
constexpr size_t maxCachedBytes
static std::mutex mutex
Definition: Proxy.cc:8
base
Main Program
Definition: newFWLiteAna.py:92
std::string as_bytes(size_t value)
void * allocate(size_t bytes, Queue queue)
Definition: config.py:1
assert(be >=bs)
bool tryReuseCachedBlock(BlockDescriptor &block)
static std::string to_string(const XMLCh *ch)
CachingAllocator(Device const &device, AllocatorConfig const &config, bool reuseSameQueueAllocations, bool debug=false)
Buffer allocateBuffer(size_t bytes, Queue const &queue)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
Definition: value.py:1
size_t cacheSize(size_t maxCachedBytes, double maxCachedFraction) const
constexpr unsigned int power(unsigned int base, unsigned int exponent)
#define debug
Definition: HDRShower.cc:19
std::tuple< unsigned int, size_t > findBin(size_t bytes) const
void allocateNewBlock(BlockDescriptor &block)
std::string getName(const G4String &)
Definition: ForwardName.cc:3
alpaka::Buf< Device, std::byte, alpaka::DimInt< 1u >, size_t > Buffer
static const std::string deviceType_
constexpr unsigned int binGrowth
def move(src, dest)
Definition: eostools.py:511
std::multimap< unsigned int, BlockDescriptor > CachedBlocks
constexpr unsigned int minBin
constexpr double maxCachedFraction