CMS 3D CMS Logo

CachingAllocator.h
Go to the documentation of this file.
1 #ifndef HeterogeneousCore_AlpakaInterface_interface_CachingAllocator_h
2 #define HeterogeneousCore_AlpakaInterface_interface_CachingAllocator_h
3 
4 #include <cassert>
5 #include <exception>
6 #include <iomanip>
7 #include <iostream>
8 #include <map>
9 #include <mutex>
10 #include <optional>
11 #include <sstream>
12 #include <string>
13 #include <tuple>
14 #include <type_traits>
15 
16 #include <alpaka/alpaka.hpp>
17 
20 
21 // Inspired by cub::CachingDeviceAllocator
22 
23 namespace cms::alpakatools {
24 
25  namespace detail {
26 
27  inline constexpr unsigned int power(unsigned int base, unsigned int exponent) {
28  unsigned int power = 1;
29  while (exponent > 0) {
30  if (exponent & 1) {
31  power = power * base;
32  }
33  base = base * base;
34  exponent = exponent >> 1;
35  }
36  return power;
37  }
38 
39  // format a memory size in B/kB/MB/GB
40  inline std::string as_bytes(size_t value) {
42  return "unlimited";
43  } else if (value >= (1 << 30) and value % (1 << 30) == 0) {
44  return std::to_string(value >> 30) + " GB";
45  } else if (value >= (1 << 20) and value % (1 << 20) == 0) {
46  return std::to_string(value >> 20) + " MB";
47  } else if (value >= (1 << 10) and value % (1 << 10) == 0) {
48  return std::to_string(value >> 10) + " kB";
49  } else {
50  return std::to_string(value) + " B";
51  }
52  }
53 
54  } // namespace detail
55 
56  /*
57  * The "memory device" identifies the memory space, i.e. the device where the memory is allocated.
58  * A caching allocator object is associated to a single memory `Device`, set at construction time, and unchanged for
59  * the lifetime of the allocator.
60  *
61  * Each allocation is associated to an event on a queue, that identifies the "synchronisation device" according to
62  * which the synchronisation occurs.
63  * The `Event` type depends only on the synchronisation `Device` type.
64  * The `Queue` type depends on the synchronisation `Device` type and the queue properties, either `Sync` or `Async`.
65  *
66  * **Note**: how to handle different queue and event types in a single allocator ? store and access type-punned
67  * queues and events ? or template the internal structures on them, but with a common base class ?
68  * alpaka does rely on the compile-time type for dispatch.
69  *
70  * Common use case #1: accelerator's memory allocations
71  * - the "memory device" is the accelerator device (e.g. a GPU);
72  * - the "synchronisation device" is the same accelerator device;
73  * - the `Queue` type is usually always the same (either `Sync` or `Async`).
74  *
75  * Common use case #2: pinned host memory allocations
76  * - the "memory device" is the host device (e.g. system memory);
77  * - the "synchronisation device" is the accelerator device (e.g. a GPU) whose work queue will access the host;
78  * memory (direct memory access from the accelerator, or scheduling `alpaka::memcpy`/`alpaka::memset`), and can
79  * be different for each allocation;
80  * - the synchronisation `Device` _type_ could potentially be different, but memory pinning is currently tied to
81  * the accelerator's platform (CUDA, HIP, etc.), so the device type needs to be fixed to benefit from caching;
82  * - the `Queue` type can be either `Sync` _or_ `Async` on any allocation.
83  */
84 
85  template <typename TDev, typename TQueue>
87  public:
88 #ifdef ALPAKA_ACC_GPU_CUDA_ENABLED
89  friend class alpaka_cuda_async::AlpakaService;
90 #endif
91 #ifdef ALPAKA_ACC_GPU_HIP_ENABLED
92  friend class alpaka_rocm_async::AlpakaService;
93 #endif
94 #ifdef ALPAKA_ACC_CPU_B_SEQ_T_SEQ_ENABLED
95  friend class alpaka_serial_sync::AlpakaService;
96 #endif
97 #ifdef ALPAKA_ACC_CPU_B_TBB_T_SEQ_ENABLED
98  friend class alpaka_tbb_async::AlpakaService;
99 #endif
100 
101  using Device = TDev; // the "memory device", where the memory will be allocated
102  using Queue = TQueue; // the queue used to submit the memory operations
103  using Event = alpaka::Event<Queue>; // the events used to synchronise the operations
104  using Buffer = alpaka::Buf<Device, std::byte, alpaka::DimInt<1u>, size_t>;
105 
106  // The "memory device" type can either be the same as the "synchronisation device" type, or be the host CPU.
107  static_assert(alpaka::isDevice<Device>, "TDev should be an alpaka Device type.");
108  static_assert(alpaka::isQueue<Queue>, "TQueue should be an alpaka Queue type.");
109  static_assert(std::is_same_v<Device, alpaka::Dev<Queue>> or std::is_same_v<Device, alpaka::DevCpu>,
110  "The \"memory device\" type can either be the same as the \"synchronisation device\" type, or be the "
111  "host CPU.");
112 
113  struct CachedBytes {
114  size_t free = 0; // total bytes freed and cached on this device
115  size_t live = 0; // total bytes currently in use oin this device
116  size_t requested = 0; // total bytes requested and currently in use on this device
117  };
118 
120  Device const& device,
121  unsigned int binGrowth, // bin growth factor;
122  unsigned int minBin, // smallest bin, corresponds to binGrowth^minBin bytes;
123  // smaller allocations are rounded to this value;
124  unsigned int maxBin, // largest bin, corresponds to binGrowth^maxBin bytes;
125  // larger allocations will fail;
126  size_t maxCachedBytes, // total storage for the allocator (0 means no limit);
127  double maxCachedFraction, // fraction of total device memory taken for the allocator (0 means no limit);
128  // if both maxCachedBytes and maxCachedFraction are non-zero,
129  // the smallest resulting value is used.
130  bool reuseSameQueueAllocations, // reuse non-ready allocations if they are in the same queue as the new one;
131  // this is safe only if all memory operations are scheduled in the same queue
132  bool debug)
133  : device_(device),
135  minBin_(minBin),
136  maxBin_(maxBin),
140  reuseSameQueueAllocations_(reuseSameQueueAllocations),
141  debug_(debug) {
142  if (debug_) {
143  std::ostringstream out;
144  out << "CachingAllocator settings\n"
145  << " bin growth " << binGrowth_ << "\n"
146  << " min bin " << minBin_ << "\n"
147  << " max bin " << maxBin_ << "\n"
148  << " resulting bins:\n";
149  for (auto bin = minBin_; bin <= maxBin_; ++bin) {
150  auto binSize = detail::power(binGrowth, bin);
151  out << " " << std::right << std::setw(12) << detail::as_bytes(binSize) << '\n';
152  }
153  out << " maximum amount of cached memory: " << detail::as_bytes(maxCachedBytes_);
154  std::cout << out.str() << std::endl;
155  }
156  }
157 
159  {
160  // this should never be called while some memory blocks are still live
161  std::scoped_lock lock(mutex_);
162  assert(liveBlocks_.empty());
163  assert(cachedBytes_.live == 0);
164  }
165 
166  freeAllCached();
167  }
168 
169  // return a copy of the cache allocation status, for monitoring purposes
171  std::scoped_lock lock(mutex_);
172  return cachedBytes_;
173  }
174 
175  // Allocate given number of bytes on the current device associated to given queue
176  void* allocate(size_t bytes, Queue queue) {
177  // create a block descriptor for the requested allocation
179  block.queue = std::move(queue);
180  block.requested = bytes;
181  std::tie(block.bin, block.bytes) = findBin(bytes);
182 
183  // try to re-use a cached block, or allocate a new buffer
184  if (not tryReuseCachedBlock(block)) {
186  }
187 
188  return block.buffer->data();
189  }
190 
191  // frees an allocation
192  void free(void* ptr) {
193  std::scoped_lock lock(mutex_);
194 
195  auto iBlock = liveBlocks_.find(ptr);
196  if (iBlock == liveBlocks_.end()) {
197  std::stringstream ss;
198  ss << "Trying to free a non-live block at " << ptr;
199  throw std::runtime_error(ss.str());
200  }
201  // remove the block from the list of live blocks
202  BlockDescriptor block = std::move(iBlock->second);
203  liveBlocks_.erase(iBlock);
204  cachedBytes_.live -= block.bytes;
205  cachedBytes_.requested -= block.requested;
206 
207  bool recache = (cachedBytes_.free + block.bytes <= maxCachedBytes_);
208  if (recache) {
209  // If enqueuing the event fails, very likely an error has
210  // occurred in the asynchronous processing. In that case the
211  // error will show up in all device API function calls, and
212  // the free() will be called by destructors during stack
213  // unwinding. In order to avoid terminate() being called
214  // because of multiple exceptions it is best to ignore these
215  // errors.
216  try {
217  alpaka::enqueue(*(block.queue), *(block.event));
218  } catch (std::exception& e) {
219  if (debug_) {
220  std::ostringstream out;
221  out << "CachingAllocator::free() error from alpaka::enqueue(): " << e.what() << "\n";
222  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " freed " << block.bytes << " bytes at "
223  << ptr << " from associated queue " << block.queue->m_spQueueImpl.get() << ", event "
224  << block.event->m_spEventImpl.get() << " .\n\t\t " << cachedBlocks_.size()
225  << " available blocks cached (" << cachedBytes_.free << " bytes), " << liveBlocks_.size()
226  << " live blocks (" << cachedBytes_.live << " bytes) outstanding." << std::endl;
227  std::cout << out.str() << std::endl;
228  }
229  return;
230  }
231  cachedBytes_.free += block.bytes;
232  // after the call to insert(), cachedBlocks_ shares ownership of the buffer
233  // TODO use std::move ?
234  cachedBlocks_.insert(std::make_pair(block.bin, block));
235 
236  if (debug_) {
237  std::ostringstream out;
238  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " returned " << block.bytes << " bytes at "
239  << ptr << " from associated queue " << block.queue->m_spQueueImpl.get() << " , event "
240  << block.event->m_spEventImpl.get() << " .\n\t\t " << cachedBlocks_.size() << " available blocks cached ("
241  << cachedBytes_.free << " bytes), " << liveBlocks_.size() << " live blocks (" << cachedBytes_.live
242  << " bytes) outstanding." << std::endl;
243  std::cout << out.str() << std::endl;
244  }
245  } else {
246  // if the buffer is not recached, it is automatically freed when block goes out of scope
247  if (debug_) {
248  std::ostringstream out;
249  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " freed " << block.bytes << " bytes at "
250  << ptr << " from associated queue " << block.queue->m_spQueueImpl.get() << ", event "
251  << block.event->m_spEventImpl.get() << " .\n\t\t " << cachedBlocks_.size() << " available blocks cached ("
252  << cachedBytes_.free << " bytes), " << liveBlocks_.size() << " live blocks (" << cachedBytes_.live
253  << " bytes) outstanding." << std::endl;
254  std::cout << out.str() << std::endl;
255  }
256  }
257  }
258 
259  private:
261  std::optional<Buffer> buffer;
262  std::optional<Queue> queue;
263  std::optional<Event> event;
264  size_t bytes = 0;
265  size_t requested = 0; // for monitoring only
266  unsigned int bin = 0;
267 
268  // the "synchronisation device" for this block
269  auto device() { return alpaka::getDev(*queue); }
270  };
271 
272  private:
273  // return the maximum amount of memory that should be cached on this device
274  size_t cacheSize(size_t maxCachedBytes, double maxCachedFraction) const {
275  // note that getMemBytes() returns 0 if the platform does not support querying the device memory
276  size_t totalMemory = alpaka::getMemBytes(device_);
277  size_t memoryFraction = static_cast<size_t>(maxCachedFraction * totalMemory);
279  if (maxCachedBytes > 0 and maxCachedBytes < size) {
281  }
282  if (memoryFraction > 0 and memoryFraction < size) {
283  size = memoryFraction;
284  }
285  return size;
286  }
287 
288  // return (bin, bin size)
289  std::tuple<unsigned int, size_t> findBin(size_t bytes) const {
290  if (bytes < minBinBytes_) {
291  return std::make_tuple(minBin_, minBinBytes_);
292  }
293  if (bytes > maxBinBytes_) {
294  throw std::runtime_error("Requested allocation size " + std::to_string(bytes) +
295  " bytes is too large for the caching detail with maximum bin " +
297  " bytes. You might want to increase the maximum bin size");
298  }
299  unsigned int bin = minBin_;
300  size_t binBytes = minBinBytes_;
301  while (binBytes < bytes) {
302  ++bin;
303  binBytes *= binGrowth_;
304  }
305  return std::make_tuple(bin, binBytes);
306  }
307 
309  std::scoped_lock lock(mutex_);
310 
311  // iterate through the range of cached blocks in the same bin
312  const auto [begin, end] = cachedBlocks_.equal_range(block.bin);
313  for (auto iBlock = begin; iBlock != end; ++iBlock) {
314  if ((reuseSameQueueAllocations_ and (*block.queue == *(iBlock->second.queue))) or
315  alpaka::isComplete(*(iBlock->second.event))) {
316  // associate the cached buffer to the new queue
317  auto queue = std::move(*(block.queue));
318  // TODO cache (or remove) the debug information and use std::move()
319  block = iBlock->second;
320  block.queue = std::move(queue);
321 
322  // if the new queue is on different device than the old event, create a new event
323  if (block.device() != alpaka::getDev(*(block.event))) {
324  block.event = Event{block.device()};
325  }
326 
327  // insert the cached block into the live blocks
328  // TODO cache (or remove) the debug information and use std::move()
329  liveBlocks_[block.buffer->data()] = block;
330 
331  // update the accounting information
332  cachedBytes_.free -= block.bytes;
333  cachedBytes_.live += block.bytes;
334  cachedBytes_.requested += block.requested;
335 
336  if (debug_) {
337  std::ostringstream out;
338  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " reused cached block at "
339  << block.buffer->data() << " (" << block.bytes << " bytes) for queue "
340  << block.queue->m_spQueueImpl.get() << ", event " << block.event->m_spEventImpl.get()
341  << " (previously associated with queue " << iBlock->second.queue->m_spQueueImpl.get() << " , event "
342  << iBlock->second.event->m_spEventImpl.get() << ")." << std::endl;
343  std::cout << out.str() << std::endl;
344  }
345 
346  // remove the reused block from the list of cached blocks
347  cachedBlocks_.erase(iBlock);
348  return true;
349  }
350  }
351 
352  return false;
353  }
354 
355  Buffer allocateBuffer(size_t bytes, Queue const& queue) {
356  if constexpr (std::is_same_v<Device, alpaka::Dev<Queue>>) {
357  // allocate device memory
358  return alpaka::allocBuf<std::byte, size_t>(device_, bytes);
359  } else if constexpr (std::is_same_v<Device, alpaka::DevCpu>) {
360  // allocate pinned host memory accessible by the queue's platform
361  using Platform = alpaka::Platform<alpaka::Dev<Queue>>;
362  return alpaka::allocMappedBuf<Platform, std::byte, size_t>(device_, platform<Platform>(), bytes);
363  } else {
364  // unsupported combination
365  static_assert(std::is_same_v<Device, alpaka::Dev<Queue>> or std::is_same_v<Device, alpaka::DevCpu>,
366  "The \"memory device\" type can either be the same as the \"synchronisation device\" type, or be "
367  "the host CPU.");
368  }
369  }
370 
372  try {
373  block.buffer = allocateBuffer(block.bytes, *block.queue);
374  } catch (std::runtime_error const& e) {
375  // the allocation attempt failed: free all cached blocks on the device and retry
376  if (debug_) {
377  std::ostringstream out;
378  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " failed to allocate " << block.bytes
379  << " bytes for queue " << block.queue->m_spQueueImpl.get()
380  << ", retrying after freeing cached allocations" << std::endl;
381  std::cout << out.str() << std::endl;
382  }
383  // TODO implement a method that frees only up to block.bytes bytes
384  freeAllCached();
385 
386  // throw an exception if it fails again
387  block.buffer = allocateBuffer(block.bytes, *block.queue);
388  }
389 
390  // create a new event associated to the "synchronisation device"
391  block.event = Event{block.device()};
392 
393  {
394  std::scoped_lock lock(mutex_);
395  cachedBytes_.live += block.bytes;
396  cachedBytes_.requested += block.requested;
397  // TODO use std::move() ?
398  liveBlocks_[block.buffer->data()] = block;
399  }
400 
401  if (debug_) {
402  std::ostringstream out;
403  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " allocated new block at "
404  << block.buffer->data() << " (" << block.bytes << " bytes associated with queue "
405  << block.queue->m_spQueueImpl.get() << ", event " << block.event->m_spEventImpl.get() << "." << std::endl;
406  std::cout << out.str() << std::endl;
407  }
408  }
409 
410  void freeAllCached() {
411  std::scoped_lock lock(mutex_);
412 
413  while (not cachedBlocks_.empty()) {
414  auto iBlock = cachedBlocks_.begin();
415  cachedBytes_.free -= iBlock->second.bytes;
416 
417  if (debug_) {
418  std::ostringstream out;
419  out << "\t" << deviceType_ << " " << alpaka::getName(device_) << " freed " << iBlock->second.bytes
420  << " bytes.\n\t\t " << (cachedBlocks_.size() - 1) << " available blocks cached (" << cachedBytes_.free
421  << " bytes), " << liveBlocks_.size() << " live blocks (" << cachedBytes_.live << " bytes) outstanding."
422  << std::endl;
423  std::cout << out.str() << std::endl;
424  }
425 
426  cachedBlocks_.erase(iBlock);
427  }
428  }
429 
430  // TODO replace with a tbb::concurrent_multimap ?
431  using CachedBlocks = std::multimap<unsigned int, BlockDescriptor>; // ordered by the allocation bin
432  // TODO replace with a tbb::concurrent_map ?
433  using BusyBlocks = std::map<void*, BlockDescriptor>; // ordered by the address of the allocated memory
434 
435  inline static const std::string deviceType_ = alpaka::core::demangled<Device>;
436 
438  Device device_; // the device where the memory is allocated
439 
441  CachedBlocks cachedBlocks_; // Set of cached device allocations available for reuse
442  BusyBlocks liveBlocks_; // map of pointers to the live device allocations currently in use
443 
444  const unsigned int binGrowth_; // Geometric growth factor for bin-sizes
445  const unsigned int minBin_;
446  const unsigned int maxBin_;
447 
448  const size_t minBinBytes_;
449  const size_t maxBinBytes_;
450  const size_t maxCachedBytes_; // Maximum aggregate cached bytes per device
451 
453  const bool debug_;
454  };
455 
456 } // namespace cms::alpakatools
457 
458 #endif // HeterogeneousCore_AlpakaInterface_interface_CachingAllocator_h
std::map< void *, BlockDescriptor > BusyBlocks
constexpr unsigned int minBin
static std::mutex mutex
Definition: Proxy.cc:8
base
Main Program
Definition: newFWLiteAna.py:92
CachingAllocator(Device const &device, unsigned int binGrowth, unsigned int minBin, unsigned int maxBin, size_t maxCachedBytes, double maxCachedFraction, bool reuseSameQueueAllocations, bool debug)
std::string as_bytes(size_t value)
void * allocate(size_t bytes, Queue queue)
constexpr unsigned int maxBin
assert(be >=bs)
bool tryReuseCachedBlock(BlockDescriptor &block)
static std::string to_string(const XMLCh *ch)
constexpr double maxCachedFraction
Buffer allocateBuffer(size_t bytes, Queue const &queue)
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
Definition: value.py:1
constexpr size_t maxCachedBytes
size_t cacheSize(size_t maxCachedBytes, double maxCachedFraction) const
constexpr unsigned int power(unsigned int base, unsigned int exponent)
#define debug
Definition: HDRShower.cc:19
std::tuple< unsigned int, size_t > findBin(size_t bytes) const
void allocateNewBlock(BlockDescriptor &block)
constexpr unsigned int binGrowth
std::string getName(const G4String &)
Definition: ForwardName.cc:3
alpaka::Buf< Device, std::byte, alpaka::DimInt< 1u >, size_t > Buffer
static const std::string deviceType_
def move(src, dest)
Definition: eostools.py:511
std::multimap< unsigned int, BlockDescriptor > CachedBlocks