CMS 3D CMS Logo

CachingAllocator.h
Go to the documentation of this file.
1 #ifndef HeterogeneousCore_AlpakaInterface_interface_CachingAllocator_h
2 #define HeterogeneousCore_AlpakaInterface_interface_CachingAllocator_h
3 
4 #include <cassert>
5 #include <exception>
6 #include <iomanip>
7 #include <iostream>
8 #include <map>
9 #include <mutex>
10 #include <optional>
11 #include <sstream>
12 #include <string>
13 #include <tuple>
14 #include <type_traits>
15 
16 #include <alpaka/alpaka.hpp>
17 
21 
22 // Inspired by cub::CachingDeviceAllocator
23 
24 namespace cms::alpakatools {
25 
26  namespace detail {
27 
28  inline constexpr unsigned int power(unsigned int base, unsigned int exponent) {
29  unsigned int power = 1;
30  while (exponent > 0) {
31  if (exponent & 1) {
32  power = power * base;
33  }
34  base = base * base;
35  exponent = exponent >> 1;
36  }
37  return power;
38  }
39 
40  // format a memory size in B/kB/MB/GB
41  inline std::string as_bytes(size_t value) {
43  return "unlimited";
44  } else if (value >= (1 << 30) and value % (1 << 30) == 0) {
45  return std::to_string(value >> 30) + " GB";
46  } else if (value >= (1 << 20) and value % (1 << 20) == 0) {
47  return std::to_string(value >> 20) + " MB";
48  } else if (value >= (1 << 10) and value % (1 << 10) == 0) {
49  return std::to_string(value >> 10) + " kB";
50  } else {
51  return std::to_string(value) + " B";
52  }
53  }
54 
55  } // namespace detail
56 
57  /*
58  * The "memory device" identifies the memory space, i.e. the device where the memory is allocated.
59  * A caching allocator object is associated to a single memory `Device`, set at construction time, and unchanged for
60  * the lifetime of the allocator.
61  *
62  * Each allocation is associated to an event on a queue, that identifies the "synchronisation device" according to
63  * which the synchronisation occurs.
64  * The `Event` type depends only on the synchronisation `Device` type.
65  * The `Queue` type depends on the synchronisation `Device` type and the queue properties, either `Sync` or `Async`.
66  *
67  * **Note**: how to handle different queue and event types in a single allocator ? store and access type-punned
68  * queues and events ? or template the internal structures on them, but with a common base class ?
69  * alpaka does rely on the compile-time type for dispatch.
70  *
71  * Common use case #1: accelerator's memory allocations
72  * - the "memory device" is the accelerator device (e.g. a GPU);
73  * - the "synchronisation device" is the same accelerator device;
74  * - the `Queue` type is usually always the same (either `Sync` or `Async`).
75  *
76  * Common use case #2: pinned host memory allocations
77  * - the "memory device" is the host device (e.g. system memory);
78  * - the "synchronisation device" is the accelerator device (e.g. a GPU) whose work queue will access the host;
79  * memory (direct memory access from the accelerator, or scheduling `alpaka::memcpy`/`alpaka::memset`), and can
80  * be different for each allocation;
81  * - the synchronisation `Device` _type_ could potentially be different, but memory pinning is currently tied to
82  * the accelerator's platform (CUDA, HIP, etc.), so the device type needs to be fixed to benefit from caching;
83  * - the `Queue` type can be either `Sync` _or_ `Async` on any allocation.
84  */
85 
86  template <typename TDev, typename TQueue>
88  public:
89 #ifdef ALPAKA_ACC_GPU_CUDA_ENABLED
90  friend class alpaka_cuda_async::AlpakaService;
91 #endif
92 #ifdef ALPAKA_ACC_GPU_HIP_ENABLED
93  friend class alpaka_rocm_async::AlpakaService;
94 #endif
95 #ifdef ALPAKA_ACC_CPU_B_SEQ_T_SEQ_ENABLED
96  friend class alpaka_serial_sync::AlpakaService;
97 #endif
98 #ifdef ALPAKA_ACC_CPU_B_TBB_T_SEQ_ENABLED
99  friend class alpaka_tbb_async::AlpakaService;
100 #endif
101 
102  using Device = TDev; // the "memory device", where the memory will be allocated
103  using Queue = TQueue; // the queue used to submit the memory operations
104  using Event = alpaka::Event<Queue>; // the events used to synchronise the operations
105  using Buffer = alpaka::Buf<Device, std::byte, alpaka::DimInt<1u>, size_t>;
106 
107  // The "memory device" type can either be the same as the "synchronisation device" type, or be the host CPU.
108  static_assert(alpaka::isDevice<Device>, "TDev should be an alpaka Device type.");
109  static_assert(alpaka::isQueue<Queue>, "TQueue should be an alpaka Queue type.");
110  static_assert(std::is_same_v<Device, alpaka::Dev<Queue>> or std::is_same_v<Device, alpaka::DevCpu>,
111  "The \"memory device\" type can either be the same as the \"synchronisation device\" type, or be the "
112  "host CPU.");
113 
114  struct CachedBytes {
115  size_t free = 0; // total bytes freed and cached on this device
116  size_t live = 0; // total bytes currently in use oin this device
117  size_t requested = 0; // total bytes requested and currently in use on this device
118  };
119 
121  Device const& device,
122  unsigned int binGrowth, // bin growth factor;
123  unsigned int minBin, // smallest bin, corresponds to binGrowth^minBin bytes;
124  // smaller allocations are rounded to this value;
125  unsigned int maxBin, // largest bin, corresponds to binGrowth^maxBin bytes;
126  // larger allocations will fail;
127  size_t maxCachedBytes, // total storage for the allocator (0 means no limit);
128  double maxCachedFraction, // fraction of total device memory taken for the allocator (0 means no limit);
129  // if both maxCachedBytes and maxCachedFraction are non-zero,
130  // the smallest resulting value is used.
131  bool reuseSameQueueAllocations, // reuse non-ready allocations if they are in the same queue as the new one;
132  // this is safe only if all memory operations are scheduled in the same queue
133  bool debug)
134  : device_(device),
136  minBin_(minBin),
137  maxBin_(maxBin),
141  reuseSameQueueAllocations_(reuseSameQueueAllocations),
142  debug_(debug) {
143  if (debug_) {
144  std::ostringstream out;
145  out << "CachingAllocator settings\n"
146  << " bin growth " << binGrowth_ << "\n"
147  << " min bin " << minBin_ << "\n"
148  << " max bin " << maxBin_ << "\n"
149  << " resulting bins:\n";
150  for (auto bin = minBin_; bin <= maxBin_; ++bin) {
151  auto binSize = detail::power(binGrowth, bin);
152  out << " " << std::right << std::setw(12) << detail::as_bytes(binSize) << '\n';
153  }
154  out << " maximum amount of cached memory: " << detail::as_bytes(maxCachedBytes_);
155  std::cout << out.str() << std::endl;
156  }
157  }
158 
160  {
161  // this should never be called while some memory blocks are still live
162  std::scoped_lock lock(mutex_);
163  assert(liveBlocks_.empty());
164  assert(cachedBytes_.live == 0);
165  }
166 
167  freeAllCached();
168  }
169 
170  // return a copy of the cache allocation status, for monitoring purposes
172  std::scoped_lock lock(mutex_);
173  return cachedBytes_;
174  }
175 
176  // Allocate given number of bytes on the current device associated to given queue
177  void* allocate(size_t bytes, Queue queue) {
178  // create a block descriptor for the requested allocation
180  block.queue = std::move(queue);
181  block.requested = bytes;
182  std::tie(block.bin, block.bytes) = findBin(bytes);
183 
184  // try to re-use a cached block, or allocate a new buffer
185  if (not tryReuseCachedBlock(block)) {
187  }
188 
189  return block.buffer->data();
190  }
191 
192  // frees an allocation
193  void free(void* ptr) {
194  std::scoped_lock lock(mutex_);
195 
196  auto iBlock = liveBlocks_.find(ptr);
197  if (iBlock == liveBlocks_.end()) {
198  std::stringstream ss;
199  ss << "Trying to free a non-live block at " << ptr;
200  throw std::runtime_error(ss.str());
201  }
202  // remove the block from the list of live blocks
203  BlockDescriptor block = std::move(iBlock->second);
204  liveBlocks_.erase(iBlock);
205  cachedBytes_.live -= block.bytes;
206  cachedBytes_.requested -= block.requested;
207 
208  bool recache = (cachedBytes_.free + block.bytes <= maxCachedBytes_);
209  if (recache) {
210  // If enqueuing the event fails, very likely an error has
211  // occurred in the asynchronous processing. In that case the
212  // error will show up in all device API function calls, and
213  // the free() will be called by destructors during stack
214  // unwinding. In order to avoid terminate() being called
215  // because of multiple exceptions it is best to ignore these
216  // errors.
217  try {
218  alpaka::enqueue(*(block.queue), *(block.event));
219  } catch (std::exception& e) {
220  if (debug_) {
221  std::ostringstream out;
222  out << "CachingAllocator::free() error from alpaka::enqueue(): " << e.what() << "\n";
223  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " freed " << block.bytes << " bytes at "
224  << ptr << " from associated queue " << block.queue->m_spQueueImpl.get() << ", event "
225  << block.event->m_spEventImpl.get() << " .\n\t\t " << cachedBlocks_.size()
226  << " available blocks cached (" << cachedBytes_.free << " bytes), " << liveBlocks_.size()
227  << " live blocks (" << cachedBytes_.live << " bytes) outstanding." << std::endl;
228  std::cout << out.str() << std::endl;
229  }
230  return;
231  }
232  cachedBytes_.free += block.bytes;
233  // after the call to insert(), cachedBlocks_ shares ownership of the buffer
234  // TODO use std::move ?
235  cachedBlocks_.insert(std::make_pair(block.bin, block));
236 
237  if (debug_) {
238  std::ostringstream out;
239  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " returned " << block.bytes << " bytes at "
240  << ptr << " from associated queue " << block.queue->m_spQueueImpl.get() << " , event "
241  << block.event->m_spEventImpl.get() << " .\n\t\t " << cachedBlocks_.size() << " available blocks cached ("
242  << cachedBytes_.free << " bytes), " << liveBlocks_.size() << " live blocks (" << cachedBytes_.live
243  << " bytes) outstanding." << std::endl;
244  std::cout << out.str() << std::endl;
245  }
246  } else {
247  // if the buffer is not recached, it is automatically freed when block goes out of scope
248  if (debug_) {
249  std::ostringstream out;
250  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " freed " << block.bytes << " bytes at "
251  << ptr << " from associated queue " << block.queue->m_spQueueImpl.get() << ", event "
252  << block.event->m_spEventImpl.get() << " .\n\t\t " << cachedBlocks_.size() << " available blocks cached ("
253  << cachedBytes_.free << " bytes), " << liveBlocks_.size() << " live blocks (" << cachedBytes_.live
254  << " bytes) outstanding." << std::endl;
255  std::cout << out.str() << std::endl;
256  }
257  }
258  }
259 
260  private:
262  std::optional<Buffer> buffer;
263  std::optional<Queue> queue;
264  std::optional<Event> event;
265  size_t bytes = 0;
266  size_t requested = 0; // for monitoring only
267  unsigned int bin = 0;
268 
269  // the "synchronisation device" for this block
270  auto device() { return alpaka::getDev(*queue); }
271  };
272 
273  private:
274  // return the maximum amount of memory that should be cached on this device
275  size_t cacheSize(size_t maxCachedBytes, double maxCachedFraction) const {
276  // note that getMemBytes() returns 0 if the platform does not support querying the device memory
277  size_t totalMemory = alpaka::getMemBytes(device_);
278  size_t memoryFraction = static_cast<size_t>(maxCachedFraction * totalMemory);
280  if (maxCachedBytes > 0 and maxCachedBytes < size) {
282  }
283  if (memoryFraction > 0 and memoryFraction < size) {
284  size = memoryFraction;
285  }
286  return size;
287  }
288 
289  // return (bin, bin size)
290  std::tuple<unsigned int, size_t> findBin(size_t bytes) const {
291  if (bytes < minBinBytes_) {
292  return std::make_tuple(minBin_, minBinBytes_);
293  }
294  if (bytes > maxBinBytes_) {
295  throw std::runtime_error("Requested allocation size " + std::to_string(bytes) +
296  " bytes is too large for the caching detail with maximum bin " +
298  " bytes. You might want to increase the maximum bin size");
299  }
300  unsigned int bin = minBin_;
301  size_t binBytes = minBinBytes_;
302  while (binBytes < bytes) {
303  ++bin;
304  binBytes *= binGrowth_;
305  }
306  return std::make_tuple(bin, binBytes);
307  }
308 
310  std::scoped_lock lock(mutex_);
311 
312  // iterate through the range of cached blocks in the same bin
313  const auto [begin, end] = cachedBlocks_.equal_range(block.bin);
314  for (auto iBlock = begin; iBlock != end; ++iBlock) {
315  if ((reuseSameQueueAllocations_ and (*block.queue == *(iBlock->second.queue))) or
316  alpaka::isComplete(*(iBlock->second.event))) {
317  // associate the cached buffer to the new queue
318  auto queue = std::move(*(block.queue));
319  // TODO cache (or remove) the debug information and use std::move()
320  block = iBlock->second;
321  block.queue = std::move(queue);
322 
323  // if the new queue is on different device than the old event, create a new event
324  if (block.device() != alpaka::getDev(*(block.event))) {
325  block.event = Event{block.device()};
326  }
327 
328  // insert the cached block into the live blocks
329  // TODO cache (or remove) the debug information and use std::move()
330  liveBlocks_[block.buffer->data()] = block;
331 
332  // update the accounting information
333  cachedBytes_.free -= block.bytes;
334  cachedBytes_.live += block.bytes;
335  cachedBytes_.requested += block.requested;
336 
337  if (debug_) {
338  std::ostringstream out;
339  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " reused cached block at "
340  << block.buffer->data() << " (" << block.bytes << " bytes) for queue "
341  << block.queue->m_spQueueImpl.get() << ", event " << block.event->m_spEventImpl.get()
342  << " (previously associated with queue " << iBlock->second.queue->m_spQueueImpl.get() << " , event "
343  << iBlock->second.event->m_spEventImpl.get() << ")." << std::endl;
344  std::cout << out.str() << std::endl;
345  }
346 
347  // remove the reused block from the list of cached blocks
348  cachedBlocks_.erase(iBlock);
349  return true;
350  }
351  }
352 
353  return false;
354  }
355 
356  Buffer allocateBuffer(size_t bytes, Queue const& queue) {
357  if constexpr (std::is_same_v<Device, alpaka::Dev<Queue>>) {
358  // allocate device memory
359  return alpaka::allocBuf<std::byte, size_t>(device_, bytes);
360  } else if constexpr (std::is_same_v<Device, alpaka::DevCpu>) {
361  // allocate pinned host memory accessible by the queue's platform
362  using Platform = alpaka::Platform<alpaka::Dev<Queue>>;
363  return alpaka::allocMappedBuf<Platform, std::byte, size_t>(device_, platform<Platform>(), bytes);
364  } else {
365  // unsupported combination
366  static_assert(std::is_same_v<Device, alpaka::Dev<Queue>> or std::is_same_v<Device, alpaka::DevCpu>,
367  "The \"memory device\" type can either be the same as the \"synchronisation device\" type, or be "
368  "the host CPU.");
369  }
370  }
371 
373  try {
374  block.buffer = allocateBuffer(block.bytes, *block.queue);
375  } catch (std::runtime_error const& e) {
376  // the allocation attempt failed: free all cached blocks on the device and retry
377  if (debug_) {
378  std::ostringstream out;
379  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " failed to allocate " << block.bytes
380  << " bytes for queue " << block.queue->m_spQueueImpl.get()
381  << ", retrying after freeing cached allocations" << std::endl;
382  std::cout << out.str() << std::endl;
383  }
384  // TODO implement a method that frees only up to block.bytes bytes
385  freeAllCached();
386 
387  // throw an exception if it fails again
388  block.buffer = allocateBuffer(block.bytes, *block.queue);
389  }
390 
391  // create a new event associated to the "synchronisation device"
392  block.event = Event{block.device()};
393 
394  {
395  std::scoped_lock lock(mutex_);
396  cachedBytes_.live += block.bytes;
397  cachedBytes_.requested += block.requested;
398  // TODO use std::move() ?
399  liveBlocks_[block.buffer->data()] = block;
400  }
401 
402  if (debug_) {
403  std::ostringstream out;
404  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " allocated new block at "
405  << block.buffer->data() << " (" << block.bytes << " bytes associated with queue "
406  << block.queue->m_spQueueImpl.get() << ", event " << block.event->m_spEventImpl.get() << "." << std::endl;
407  std::cout << out.str() << std::endl;
408  }
409  }
410 
411  void freeAllCached() {
412  std::scoped_lock lock(mutex_);
413 
414  while (not cachedBlocks_.empty()) {
415  auto iBlock = cachedBlocks_.begin();
416  cachedBytes_.free -= iBlock->second.bytes;
417 
418  if (debug_) {
419  std::ostringstream out;
420  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " freed " << iBlock->second.bytes
421  << " bytes.\n\t\t " << (cachedBlocks_.size() - 1) << " available blocks cached (" << cachedBytes_.free
422  << " bytes), " << liveBlocks_.size() << " live blocks (" << cachedBytes_.live << " bytes) outstanding."
423  << std::endl;
424  std::cout << out.str() << std::endl;
425  }
426 
427  cachedBlocks_.erase(iBlock);
428  }
429  }
430 
431  // TODO replace with a tbb::concurrent_multimap ?
432  using CachedBlocks = std::multimap<unsigned int, BlockDescriptor>; // ordered by the allocation bin
433  // TODO replace with a tbb::concurrent_map ?
434  using BusyBlocks = std::map<void*, BlockDescriptor>; // ordered by the address of the allocated memory
435 
436  inline static const std::string deviceType_ = alpaka::core::demangled<Device>;
437 
439  Device device_; // the device where the memory is allocated
440 
442  CachedBlocks cachedBlocks_; // Set of cached device allocations available for reuse
443  BusyBlocks liveBlocks_; // map of pointers to the live device allocations currently in use
444 
445  const unsigned int binGrowth_; // Geometric growth factor for bin-sizes
446  const unsigned int minBin_;
447  const unsigned int maxBin_;
448 
449  const size_t minBinBytes_;
450  const size_t maxBinBytes_;
451  const size_t maxCachedBytes_; // Maximum aggregate cached bytes per device
452 
454  const bool debug_;
455  };
456 
457 } // namespace cms::alpakatools
458 
459 #endif // HeterogeneousCore_AlpakaInterface_interface_CachingAllocator_h
std::map< void *, BlockDescriptor > BusyBlocks
constexpr unsigned int minBin
static std::mutex mutex
Definition: Proxy.cc:8
base
Main Program
Definition: newFWLiteAna.py:92
CachingAllocator(Device const &device, unsigned int binGrowth, unsigned int minBin, unsigned int maxBin, size_t maxCachedBytes, double maxCachedFraction, bool reuseSameQueueAllocations, bool debug)
std::string as_bytes(size_t value)
void * allocate(size_t bytes, Queue queue)
constexpr unsigned int maxBin
assert(be >=bs)
bool tryReuseCachedBlock(BlockDescriptor &block)
static std::string to_string(const XMLCh *ch)
constexpr double maxCachedFraction
Buffer allocateBuffer(size_t bytes, Queue const &queue)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
Definition: value.py:1
constexpr size_t maxCachedBytes
size_t cacheSize(size_t maxCachedBytes, double maxCachedFraction) const
constexpr unsigned int power(unsigned int base, unsigned int exponent)
#define debug
Definition: HDRShower.cc:19
std::tuple< unsigned int, size_t > findBin(size_t bytes) const
void allocateNewBlock(BlockDescriptor &block)
constexpr unsigned int binGrowth
std::string getName(const G4String &)
Definition: ForwardName.cc:3
alpaka::Buf< Device, std::byte, alpaka::DimInt< 1u >, size_t > Buffer
static const std::string deviceType_
def move(src, dest)
Definition: eostools.py:511
std::multimap< unsigned int, BlockDescriptor > CachedBlocks