CMS 3D CMS Logo

CachingAllocator.h
Go to the documentation of this file.
1 #ifndef HeterogeneousCore_AlpakaInterface_interface_CachingAllocator_h
2 #define HeterogeneousCore_AlpakaInterface_interface_CachingAllocator_h
3 
4 #include <cassert>
5 #include <exception>
6 #include <iomanip>
7 #include <iostream>
8 #include <map>
9 #include <mutex>
10 #include <optional>
11 #include <sstream>
12 #include <string>
13 #include <tuple>
14 #include <type_traits>
15 
16 #include <alpaka/alpaka.hpp>
17 
20 
21 // Inspired by cub::CachingDeviceAllocator
22 
23 namespace cms::alpakatools {
24 
25  namespace detail {
26 
27  inline constexpr unsigned int power(unsigned int base, unsigned int exponent) {
28  unsigned int power = 1;
29  while (exponent > 0) {
30  if (exponent & 1) {
31  power = power * base;
32  }
33  base = base * base;
34  exponent = exponent >> 1;
35  }
36  return power;
37  }
38 
39  // format a memory size in B/kB/MB/GB
40  inline std::string as_bytes(size_t value) {
42  return "unlimited";
43  } else if (value >= (1 << 30) and value % (1 << 30) == 0) {
44  return std::to_string(value >> 30) + " GB";
45  } else if (value >= (1 << 20) and value % (1 << 20) == 0) {
46  return std::to_string(value >> 20) + " MB";
47  } else if (value >= (1 << 10) and value % (1 << 10) == 0) {
48  return std::to_string(value >> 10) + " kB";
49  } else {
50  return std::to_string(value) + " B";
51  }
52  }
53 
54  } // namespace detail
55 
56  /*
57  * The "memory device" identifies the memory space, i.e. the device where the memory is allocated.
58  * A caching allocator object is associated to a single memory `Device`, set at construction time, and unchanged for
59  * the lifetime of the allocator.
60  *
61  * Each allocation is associated to an event on a queue, that identifies the "synchronisation device" according to
62  * which the synchronisation occurs.
63  * The `Event` type depends only on the synchronisation `Device` type.
64  * The `Queue` type depends on the synchronisation `Device` type and the queue properties, either `Sync` or `Async`.
65  *
66  * **Note**: how to handle different queue and event types in a single allocator ? store and access type-punned
67  * queues and events ? or template the internal structures on them, but with a common base class ?
68  * alpaka does rely on the compile-time type for dispatch.
69  *
70  * Common use case #1: accelerator's memory allocations
71  * - the "memory device" is the accelerator device (e.g. a GPU);
72  * - the "synchronisation device" is the same accelerator device;
73  * - the `Queue` type is usually always the same (either `Sync` or `Async`).
74  *
75  * Common use case #2: pinned host memory allocations
76  * - the "memory device" is the host device (e.g. system memory);
77  * - the "synchronisation device" is the accelerator device (e.g. a GPU) whose work queue will access the host;
78  * memory (direct memory access from the accelerator, or scheduling `alpaka::memcpy`/`alpaka::memset`), and can
79  * be different for each allocation;
80  * - the synchronisation `Device` _type_ could potentially be different, but memory pinning is currently tied to
81  * the accelerator's platform (CUDA, HIP, etc.), so the device type needs to be fixed to benefit from caching;
82  * - the `Queue` type can be either `Sync` _or_ `Async` on any allocation.
83  */
84 
85  template <typename TDev, typename TQueue>
87  public:
88 #ifdef ALPAKA_ACC_GPU_CUDA_ENABLED
89  friend class alpaka_cuda_async::AlpakaService;
90 #endif
91 #ifdef ALPAKA_ACC_GPU_HIP_ENABLED
92  friend class alpaka_rocm_async::AlpakaService;
93 #endif
94 #ifdef ALPAKA_ACC_CPU_B_SEQ_T_SEQ_ENABLED
95  friend class alpaka_serial_sync::AlpakaService;
96 #endif
97 #ifdef ALPAKA_ACC_CPU_B_TBB_T_SEQ_ENABLED
98  friend class alpaka_tbb_async::AlpakaService;
99 #endif
100 
101  using Device = TDev; // the "memory device", where the memory will be allocated
102  using Queue = TQueue; // the queue used to submit the memory operations
103  using Event = alpaka::Event<Queue>; // the events used to synchronise the operations
104  using Buffer = alpaka::Buf<Device, std::byte, alpaka::DimInt<1u>, size_t>;
105 
106  // The "memory device" type can either be the same as the "synchronisation device" type, or be the host CPU.
107  static_assert(alpaka::isDevice<Device>, "TDev should be an alpaka Device type.");
108  static_assert(alpaka::isQueue<Queue>, "TQueue should be an alpaka Queue type.");
109  static_assert(std::is_same_v<Device, alpaka::Dev<Queue>> or std::is_same_v<Device, alpaka::DevCpu>,
110  "The \"memory device\" type can either be the same as the \"synchronisation device\" type, or be the "
111  "host CPU.");
112 
113  struct CachedBytes {
114  size_t free = 0; // total bytes freed and cached on this device
115  size_t live = 0; // total bytes currently in use oin this device
116  size_t requested = 0; // total bytes requested and currently in use on this device
117  };
118 
120  Device const& device,
121  unsigned int binGrowth, // bin growth factor;
122  unsigned int minBin, // smallest bin, corresponds to binGrowth^minBin bytes;
123  // smaller allocations are rounded to this value;
124  unsigned int maxBin, // largest bin, corresponds to binGrowth^maxBin bytes;
125  // larger allocations will fail;
126  size_t maxCachedBytes, // total storage for the allocator (0 means no limit);
127  double maxCachedFraction, // fraction of total device memory taken for the allocator (0 means no limit);
128  // if both maxCachedBytes and maxCachedFraction are non-zero,
129  // the smallest resulting value is used.
130  bool reuseSameQueueAllocations, // reuse non-ready allocations if they are in the same queue as the new one;
131  // this is safe only if all memory operations are scheduled in the same queue
132  bool debug)
133  : device_(device),
135  minBin_(minBin),
136  maxBin_(maxBin),
140  reuseSameQueueAllocations_(reuseSameQueueAllocations),
141  debug_(debug) {
142  if (debug_) {
143  std::ostringstream out;
144  out << "CachingAllocator settings\n"
145  << " bin growth " << binGrowth_ << "\n"
146  << " min bin " << minBin_ << "\n"
147  << " max bin " << maxBin_ << "\n"
148  << " resulting bins:\n";
149  for (auto bin = minBin_; bin <= maxBin_; ++bin) {
150  auto binSize = detail::power(binGrowth, bin);
151  out << " " << std::right << std::setw(12) << detail::as_bytes(binSize) << '\n';
152  }
153  out << " maximum amount of cached memory: " << detail::as_bytes(maxCachedBytes_);
154  std::cout << out.str() << std::endl;
155  }
156  }
157 
159  {
160  // this should never be called while some memory blocks are still live
161  std::scoped_lock lock(mutex_);
162  assert(liveBlocks_.empty());
163  assert(cachedBytes_.live == 0);
164  }
165 
166  freeAllCached();
167  }
168 
169  // return a copy of the cache allocation status, for monitoring purposes
171  std::scoped_lock lock(mutex_);
172  return cachedBytes_;
173  }
174 
175  // Allocate given number of bytes on the current device associated to given queue
176  void* allocate(size_t bytes, Queue queue) {
177  // create a block descriptor for the requested allocation
179  block.queue = std::move(queue);
180  block.requested = bytes;
181  std::tie(block.bin, block.bytes) = findBin(bytes);
182 
183  // try to re-use a cached block, or allocate a new buffer
184  if (not tryReuseCachedBlock(block)) {
186  }
187 
188  return block.buffer->data();
189  }
190 
191  // frees an allocation
192  void free(void* ptr) {
193  std::scoped_lock lock(mutex_);
194 
195  auto iBlock = liveBlocks_.find(ptr);
196  if (iBlock == liveBlocks_.end()) {
197  std::stringstream ss;
198  ss << "Trying to free a non-live block at " << ptr;
199  throw std::runtime_error(ss.str());
200  }
201  // remove the block from the list of live blocks
202  BlockDescriptor block = std::move(iBlock->second);
203  liveBlocks_.erase(iBlock);
204  cachedBytes_.live -= block.bytes;
205  cachedBytes_.requested -= block.requested;
206 
207  bool recache = (cachedBytes_.free + block.bytes <= maxCachedBytes_);
208  if (recache) {
209  alpaka::enqueue(*(block.queue), *(block.event));
210  cachedBytes_.free += block.bytes;
211  // after the call to insert(), cachedBlocks_ shares ownership of the buffer
212  // TODO use std::move ?
213  cachedBlocks_.insert(std::make_pair(block.bin, block));
214 
215  if (debug_) {
216  std::ostringstream out;
217  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " returned " << block.bytes << " bytes at "
218  << ptr << " from associated queue " << block.queue->m_spQueueImpl.get() << " , event "
219  << block.event->m_spEventImpl.get() << " .\n\t\t " << cachedBlocks_.size() << " available blocks cached ("
220  << cachedBytes_.free << " bytes), " << liveBlocks_.size() << " live blocks (" << cachedBytes_.live
221  << " bytes) outstanding." << std::endl;
222  std::cout << out.str() << std::endl;
223  }
224  } else {
225  // if the buffer is not recached, it is automatically freed when block goes out of scope
226  if (debug_) {
227  std::ostringstream out;
228  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " freed " << block.bytes << " bytes at "
229  << ptr << " from associated queue " << block.queue->m_spQueueImpl.get() << ", event "
230  << block.event->m_spEventImpl.get() << " .\n\t\t " << cachedBlocks_.size() << " available blocks cached ("
231  << cachedBytes_.free << " bytes), " << liveBlocks_.size() << " live blocks (" << cachedBytes_.live
232  << " bytes) outstanding." << std::endl;
233  std::cout << out.str() << std::endl;
234  }
235  }
236  }
237 
238  private:
240  std::optional<Buffer> buffer;
241  std::optional<Queue> queue;
242  std::optional<Event> event;
243  size_t bytes = 0;
244  size_t requested = 0; // for monitoring only
245  unsigned int bin = 0;
246 
247  // the "synchronisation device" for this block
248  auto device() { return alpaka::getDev(*queue); }
249  };
250 
251  private:
252  // return the maximum amount of memory that should be cached on this device
253  size_t cacheSize(size_t maxCachedBytes, double maxCachedFraction) const {
254  // note that getMemBytes() returns 0 if the platform does not support querying the device memory
255  size_t totalMemory = alpaka::getMemBytes(device_);
256  size_t memoryFraction = static_cast<size_t>(maxCachedFraction * totalMemory);
258  if (maxCachedBytes > 0 and maxCachedBytes < size) {
260  }
261  if (memoryFraction > 0 and memoryFraction < size) {
262  size = memoryFraction;
263  }
264  return size;
265  }
266 
267  // return (bin, bin size)
268  std::tuple<unsigned int, size_t> findBin(size_t bytes) const {
269  if (bytes < minBinBytes_) {
270  return std::make_tuple(minBin_, minBinBytes_);
271  }
272  if (bytes > maxBinBytes_) {
273  throw std::runtime_error("Requested allocation size " + std::to_string(bytes) +
274  " bytes is too large for the caching detail with maximum bin " +
276  " bytes. You might want to increase the maximum bin size");
277  }
278  unsigned int bin = minBin_;
279  size_t binBytes = minBinBytes_;
280  while (binBytes < bytes) {
281  ++bin;
282  binBytes *= binGrowth_;
283  }
284  return std::make_tuple(bin, binBytes);
285  }
286 
288  std::scoped_lock lock(mutex_);
289 
290  // iterate through the range of cached blocks in the same bin
291  const auto [begin, end] = cachedBlocks_.equal_range(block.bin);
292  for (auto iBlock = begin; iBlock != end; ++iBlock) {
293  if ((reuseSameQueueAllocations_ and (*block.queue == *(iBlock->second.queue))) or
294  alpaka::isComplete(*(iBlock->second.event))) {
295  // associate the cached buffer to the new queue
296  auto queue = std::move(*(block.queue));
297  // TODO cache (or remove) the debug information and use std::move()
298  block = iBlock->second;
299  block.queue = std::move(queue);
300 
301  // if the new queue is on different device than the old event, create a new event
302  if (block.device() != alpaka::getDev(*(block.event))) {
303  block.event = Event{block.device()};
304  }
305 
306  // insert the cached block into the live blocks
307  // TODO cache (or remove) the debug information and use std::move()
308  liveBlocks_[block.buffer->data()] = block;
309 
310  // update the accounting information
311  cachedBytes_.free -= block.bytes;
312  cachedBytes_.live += block.bytes;
313  cachedBytes_.requested += block.requested;
314 
315  if (debug_) {
316  std::ostringstream out;
317  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " reused cached block at "
318  << block.buffer->data() << " (" << block.bytes << " bytes) for queue "
319  << block.queue->m_spQueueImpl.get() << ", event " << block.event->m_spEventImpl.get()
320  << " (previously associated with queue " << iBlock->second.queue->m_spQueueImpl.get() << " , event "
321  << iBlock->second.event->m_spEventImpl.get() << ")." << std::endl;
322  std::cout << out.str() << std::endl;
323  }
324 
325  // remove the reused block from the list of cached blocks
326  cachedBlocks_.erase(iBlock);
327  return true;
328  }
329  }
330 
331  return false;
332  }
333 
334  Buffer allocateBuffer(size_t bytes, Queue const& queue) {
335  if constexpr (std::is_same_v<Device, alpaka::Dev<Queue>>) {
336  // allocate device memory
337  return alpaka::allocBuf<std::byte, size_t>(device_, bytes);
338  } else if constexpr (std::is_same_v<Device, alpaka::DevCpu>) {
339  // allocate pinned host memory accessible by the queue's platform
340  return alpaka::allocMappedBuf<alpaka::Pltf<alpaka::Dev<Queue>>, std::byte, size_t>(device_, bytes);
341  } else {
342  // unsupported combination
343  static_assert(std::is_same_v<Device, alpaka::Dev<Queue>> or std::is_same_v<Device, alpaka::DevCpu>,
344  "The \"memory device\" type can either be the same as the \"synchronisation device\" type, or be "
345  "the host CPU.");
346  }
347  }
348 
350  try {
351  block.buffer = allocateBuffer(block.bytes, *block.queue);
352  } catch (std::runtime_error const& e) {
353  // the allocation attempt failed: free all cached blocks on the device and retry
354  if (debug_) {
355  std::ostringstream out;
356  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " failed to allocate " << block.bytes
357  << " bytes for queue " << block.queue->m_spQueueImpl.get()
358  << ", retrying after freeing cached allocations" << std::endl;
359  std::cout << out.str() << std::endl;
360  }
361  // TODO implement a method that frees only up to block.bytes bytes
362  freeAllCached();
363 
364  // throw an exception if it fails again
365  block.buffer = allocateBuffer(block.bytes, *block.queue);
366  }
367 
368  // create a new event associated to the "synchronisation device"
369  block.event = Event{block.device()};
370 
371  {
372  std::scoped_lock lock(mutex_);
373  cachedBytes_.live += block.bytes;
374  cachedBytes_.requested += block.requested;
375  // TODO use std::move() ?
376  liveBlocks_[block.buffer->data()] = block;
377  }
378 
379  if (debug_) {
380  std::ostringstream out;
381  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " allocated new block at "
382  << block.buffer->data() << " (" << block.bytes << " bytes associated with queue "
383  << block.queue->m_spQueueImpl.get() << ", event " << block.event->m_spEventImpl.get() << "." << std::endl;
384  std::cout << out.str() << std::endl;
385  }
386  }
387 
388  void freeAllCached() {
389  std::scoped_lock lock(mutex_);
390 
391  while (not cachedBlocks_.empty()) {
392  auto iBlock = cachedBlocks_.begin();
393  cachedBytes_.free -= iBlock->second.bytes;
394 
395  if (debug_) {
396  std::ostringstream out;
397  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " freed " << iBlock->second.bytes
398  << " bytes.\n\t\t " << (cachedBlocks_.size() - 1) << " available blocks cached (" << cachedBytes_.free
399  << " bytes), " << liveBlocks_.size() << " live blocks (" << cachedBytes_.live << " bytes) outstanding."
400  << std::endl;
401  std::cout << out.str() << std::endl;
402  }
403 
404  cachedBlocks_.erase(iBlock);
405  }
406  }
407 
408  // TODO replace with a tbb::concurrent_multimap ?
409  using CachedBlocks = std::multimap<unsigned int, BlockDescriptor>; // ordered by the allocation bin
410  // TODO replace with a tbb::concurrent_map ?
411  using BusyBlocks = std::map<void*, BlockDescriptor>; // ordered by the address of the allocated memory
412 
413  inline static const std::string deviceType_ = alpaka::core::demangled<Device>;
414 
416  Device device_; // the device where the memory is allocated
417 
419  CachedBlocks cachedBlocks_; // Set of cached device allocations available for reuse
420  BusyBlocks liveBlocks_; // map of pointers to the live device allocations currently in use
421 
422  const unsigned int binGrowth_; // Geometric growth factor for bin-sizes
423  const unsigned int minBin_;
424  const unsigned int maxBin_;
425 
426  const size_t minBinBytes_;
427  const size_t maxBinBytes_;
428  const size_t maxCachedBytes_; // Maximum aggregate cached bytes per device
429 
431  const bool debug_;
432  };
433 
434 } // namespace cms::alpakatools
435 
436 #endif // HeterogeneousCore_AlpakaInterface_interface_CachingAllocator_h
std::map< void *, BlockDescriptor > BusyBlocks
constexpr unsigned int minBin
static std::mutex mutex
Definition: Proxy.cc:8
base
Main Program
Definition: newFWLiteAna.py:92
CachingAllocator(Device const &device, unsigned int binGrowth, unsigned int minBin, unsigned int maxBin, size_t maxCachedBytes, double maxCachedFraction, bool reuseSameQueueAllocations, bool debug)
std::string as_bytes(size_t value)
void * allocate(size_t bytes, Queue queue)
constexpr unsigned int maxBin
assert(be >=bs)
bool tryReuseCachedBlock(BlockDescriptor &block)
static std::string to_string(const XMLCh *ch)
constexpr double maxCachedFraction
Buffer allocateBuffer(size_t bytes, Queue const &queue)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
Definition: value.py:1
constexpr size_t maxCachedBytes
size_t cacheSize(size_t maxCachedBytes, double maxCachedFraction) const
constexpr unsigned int power(unsigned int base, unsigned int exponent)
#define debug
Definition: HDRShower.cc:19
std::tuple< unsigned int, size_t > findBin(size_t bytes) const
void allocateNewBlock(BlockDescriptor &block)
constexpr unsigned int binGrowth
std::string getName(const G4String &)
Definition: ForwardName.cc:3
alpaka::Buf< Device, std::byte, alpaka::DimInt< 1u >, size_t > Buffer
static const std::string deviceType_
def move(src, dest)
Definition: eostools.py:511
std::multimap< unsigned int, BlockDescriptor > CachedBlocks