Skip to content

Commit

Permalink
Make buffer manager aware of current device memory usage
Browse files Browse the repository at this point in the history
- Limit amount of device memory that can be used (currently set to 95%)
- Detect out-of-memory errors and print helpful error message listing
  all currently allocated buffers and their sizes
- If resizing large buffer would exceed device memory, attempt to go
  through host first
- If accessing part of large buffer would result in resize that exceeds
  device memory, attempt to spill currently unused parts to the host
  • Loading branch information
psalz committed Jun 22, 2023
1 parent 2c738c8 commit 79f97c2
Show file tree
Hide file tree
Showing 7 changed files with 393 additions and 42 deletions.
38 changes: 34 additions & 4 deletions include/buffer_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ namespace detail {

using buffer_lifecycle_callback = std::function<void(buffer_lifecycle_event, buffer_id)>;

using device_buffer_factory = std::function<std::unique_ptr<buffer_storage>(const range<3>&, sycl::queue&)>;
using device_buffer_factory = std::function<std::unique_ptr<buffer_storage>(const range<3>&, device_queue&)>;
using host_buffer_factory = std::function<std::unique_ptr<buffer_storage>(const range<3>&)>;

struct buffer_info {
Expand Down Expand Up @@ -136,7 +136,7 @@ namespace detail {
std::unique_lock lock(m_mutex);
bid = m_buffer_count++;
m_buffers.emplace(std::piecewise_construct, std::tuple{bid}, std::tuple{});
auto device_factory = [](const celerity::range<3>& r, sycl::queue& q) {
auto device_factory = [](const celerity::range<3>& r, device_queue& q) {
return std::make_unique<device_buffer_storage<DataT, Dims>>(range_cast<Dims>(r), q);
};
auto host_factory = [](const celerity::range<3>& r) { return std::make_unique<host_buffer_storage<DataT, Dims>>(range_cast<Dims>(r)); };
Expand Down Expand Up @@ -211,10 +211,19 @@ namespace detail {
*/
void set_buffer_data(buffer_id bid, const subrange<3>& sr, unique_payload_ptr in_linearized);

// Set the maximum percentage of global device memory to be used, in interval (0, 1].
void set_max_device_global_memory_usage(const double max) {
assert(max > 0 && max <= 1);
m_max_device_global_mem_usage = max;
}

template <typename DataT, int Dims>
access_info access_device_buffer(buffer_id bid, access_mode mode, const subrange<Dims>& sr) {
#if defined(CELERITY_DETAIL_ENABLE_DEBUG)
assert((m_buffer_types.at(bid)->has_type<DataT, Dims>()));
{
std::unique_lock lock(m_mutex);
assert((m_buffer_types.at(bid)->has_type<DataT, Dims>()));
}
#endif
return access_device_buffer(bid, mode, subrange_cast<3>(sr));
}
Expand All @@ -224,7 +233,10 @@ namespace detail {
template <typename DataT, int Dims>
access_info access_host_buffer(buffer_id bid, access_mode mode, const subrange<Dims>& sr) {
#if defined(CELERITY_DETAIL_ENABLE_DEBUG)
assert((m_buffer_types.at(bid)->has_type<DataT, Dims>()));
{
std::unique_lock lock(m_mutex);
assert((m_buffer_types.at(bid)->has_type<DataT, Dims>()));
}
#endif
return access_host_buffer(bid, mode, subrange_cast<3>(sr));
}
Expand Down Expand Up @@ -324,6 +336,8 @@ namespace detail {
};

private:
// Leave some memory for other processes.
double m_max_device_global_mem_usage = 0.95;
device_queue& m_queue;
buffer_lifecycle_callback m_lifecycle_cb;
size_t m_buffer_count = 0;
Expand Down Expand Up @@ -363,6 +377,22 @@ namespace detail {
return result;
}

// Implementation of access_host_buffer, does not lock mutex (called by access_device_buffer).
access_info access_host_buffer_impl(const buffer_id bid, const access_mode mode, const subrange<3>& sr);

/**
* Returns whether an allocation of size bytes can be made without exceeding m_max_device_global_mem_usage,
* optionally while assuming assume_bytes_freed bytes to have been free'd first.
*
* NOTE: SYCL does not provide us with a way of getting the actual current memory usage of a device, so this is just a best effort guess.
*/
bool can_allocate(const size_t size_bytes, const size_t assume_bytes_freed = 0) const {
const auto total = m_queue.get_global_memory_total_size_bytes();
const auto current = m_queue.get_global_memory_allocated_bytes();
assert(assume_bytes_freed <= current);
return static_cast<double>(current - assume_bytes_freed + size_bytes) / static_cast<double>(total) < m_max_device_global_mem_usage;
}

/**
* Makes the contents of a backing buffer coherent within the range @p coherent_sr.
*
Expand Down
29 changes: 11 additions & 18 deletions include/buffer_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <CL/sycl.hpp>

#include "backend/backend.h"
#include "device_queue.h"
#include "payload.h"
#include "ranges.h"
#include "workaround.h"
Expand All @@ -31,16 +32,11 @@ namespace detail {
template <typename DataT, int Dims>
class device_buffer {
public:
device_buffer(const range<Dims>& range, sycl::queue& queue) : m_range(range), m_queue(queue) {
if(m_range.size() != 0) {
m_device_ptr = sycl::malloc_device<DataT>(m_range.size(), m_queue);
assert(m_device_ptr != nullptr);
}
device_buffer(const range<Dims>& range, device_queue& queue) : m_range(range), m_queue(queue) {
if(m_range.size() != 0) { m_device_allocation = m_queue.malloc<DataT>(m_range.size()); }
}

~device_buffer() {
if(m_range.size() != 0) { sycl::free(m_device_ptr, m_queue); }
}
~device_buffer() { m_queue.free(m_device_allocation); }

device_buffer(const device_buffer&) = delete;
device_buffer(device_buffer&&) noexcept = default;
Expand All @@ -49,16 +45,14 @@ namespace detail {

range<Dims> get_range() const { return m_range; }

DataT* get_pointer() { return m_device_ptr; }

const DataT* get_pointer() const { return m_device_ptr; }
DataT* get_pointer() { return static_cast<DataT*>(m_device_allocation.ptr); }

bool operator==(const device_buffer& rhs) const { return m_device_ptr == rhs.m_device_ptr && m_queue == rhs.m_queue && m_range == rhs.m_range; }
const DataT* get_pointer() const { return static_cast<DataT*>(m_device_allocation.ptr); }

private:
range<Dims> m_range;
sycl::queue m_queue;
DataT* m_device_ptr = nullptr;
device_queue& m_queue;
device_allocation m_device_allocation;
};

template <typename DataT, int Dims>
Expand Down Expand Up @@ -129,8 +123,9 @@ namespace detail {
template <typename DataT, int Dims>
class device_buffer_storage : public buffer_storage {
public:
device_buffer_storage(range<Dims> range, sycl::queue owning_queue)
: buffer_storage(range_cast<3>(range), buffer_type::device_buffer), m_owning_queue(std::move(owning_queue)), m_device_buf(range, m_owning_queue) {}
device_buffer_storage(range<Dims> range, device_queue& owning_queue)
: buffer_storage(range_cast<3>(range), buffer_type::device_buffer), m_owning_queue(owning_queue.get_sycl_queue()),
m_device_buf(range, owning_queue) {}

size_t get_size() const override { return get_range().size() * sizeof(DataT); };

Expand Down Expand Up @@ -164,8 +159,6 @@ namespace detail {

void copy(const buffer_storage& source, id<3> source_offset, id<3> target_offset, range<3> copy_range) override;

sycl::queue& get_owning_queue() { return m_owning_queue; }

private:
mutable sycl::queue m_owning_queue;
device_buffer<DataT, Dims> m_device_buf;
Expand Down
46 changes: 46 additions & 0 deletions include/device_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ namespace detail {
}
#endif

struct device_allocation {
void* ptr = nullptr;
size_t size_bytes = 0;
};

class allocation_error : public std::runtime_error {
public:
allocation_error(const std::string& msg) : std::runtime_error(msg) {}
};

/**
* The @p device_queue wraps the actual SYCL queue and is used to submit kernels.
*/
Expand Down Expand Up @@ -68,6 +78,40 @@ namespace detail {
return evt;
}

template <typename T>
[[nodiscard]] device_allocation malloc(const size_t count) {
const size_t size_bytes = count * sizeof(T);
assert(m_sycl_queue != nullptr);
assert(m_global_mem_allocated_bytes + size_bytes < m_global_mem_total_size_bytes);
CELERITY_DEBUG("Allocating {} bytes on device", size_bytes);
T* ptr = nullptr;
try {
ptr = sycl::aligned_alloc_device<T>(alignof(T), count, *m_sycl_queue);
} catch(sycl::exception& e) {
CELERITY_CRITICAL("sycl::aligned_alloc_device failed with exception: {}", e.what());
ptr = nullptr;
}
if(ptr == nullptr) {
throw allocation_error(fmt::format("Allocation of {} bytes failed; likely out of memory. Currently allocated: {} out of {} bytes.",
count * sizeof(T), m_global_mem_allocated_bytes, m_global_mem_total_size_bytes));
}
m_global_mem_allocated_bytes += size_bytes;
return device_allocation{ptr, size_bytes};
}

void free(device_allocation alloc) {
assert(m_sycl_queue != nullptr);
assert(alloc.size_bytes <= m_global_mem_allocated_bytes);
assert(alloc.ptr != nullptr || alloc.size_bytes == 0);
CELERITY_DEBUG("Freeing {} bytes on device", alloc.size_bytes);
if(alloc.size_bytes != 0) { sycl::free(alloc.ptr, *m_sycl_queue); }
m_global_mem_allocated_bytes -= alloc.size_bytes;
}

size_t get_global_memory_total_size_bytes() const { return m_global_mem_total_size_bytes; }

size_t get_global_memory_allocated_bytes() const { return m_global_mem_allocated_bytes; }

/**
* @brief Waits until all currently submitted operations have completed.
*/
Expand All @@ -84,6 +128,8 @@ namespace detail {
}

private:
size_t m_global_mem_total_size_bytes = 0;
size_t m_global_mem_allocated_bytes = 0;
std::unique_ptr<cl::sycl::queue> m_sycl_queue;
bool m_device_profiling_enabled = false;

Expand Down
87 changes: 77 additions & 10 deletions src/buffer_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,79 @@ namespace detail {
auto& existing_buf = m_buffers[bid].device_buf;
backing_buffer replacement_buf;

const auto die = [&](const size_t allocation_size_bytes) {
std::string msg = fmt::format("Unable to allocate buffer {} of size {}.\n", bid, allocation_size_bytes);
fmt::format_to(std::back_inserter(msg), "\nCurrent allocations:\n");
size_t total_bytes = 0;
for(const auto& [bid, b] : m_buffers) {
if(b.device_buf.is_allocated()) {
fmt::format_to(std::back_inserter(msg), "\tBuffer {}: {} bytes\n", bid, b.device_buf.storage->get_size());
total_bytes += b.device_buf.storage->get_size();
}
}
fmt::format_to(std::back_inserter(msg), "Total usage: {} / {} bytes ({:.1f}%).\n", total_bytes, m_queue.get_global_memory_total_size_bytes(),
100 * static_cast<double>(total_bytes) / static_cast<double>(m_queue.get_global_memory_total_size_bytes()));
throw allocation_error(msg);
};

if(!existing_buf.is_allocated()) {
replacement_buf = backing_buffer{m_buffer_infos.at(bid).construct_device(sr.range, m_queue.get_sycl_queue()), sr.offset};
} else {
// FIXME: For large buffers we might not be able to store two copies in device memory at once.
// Instead, we'd first have to transfer everything to the host and free the old buffer before allocating the new one.
// TODO: What we CAN do however already is to free the old buffer early iff we're requesting a discard_* access!
// (AND that access request covers the entirety of the old buffer!)
const auto info = is_resize_required(existing_buf, sr.range, sr.offset);
if(info.resize_required) {
replacement_buf = backing_buffer{m_buffer_infos.at(bid).construct_device(info.new_range, m_queue.get_sycl_queue()), info.new_offset};
const auto allocation_size_bytes = sr.range.size() * m_buffer_infos.at(bid).element_size;
if(!can_allocate(allocation_size_bytes)) {
// TODO: Unless this single allocation exceeds the total available memory on the device we don't need to abort right away,
// could evict other buffers first.
die(allocation_size_bytes);
}
replacement_buf = backing_buffer{m_buffer_infos.at(bid).construct_device(sr.range, m_queue), sr.offset};
} else if(const auto info = is_resize_required(existing_buf, sr.range, sr.offset); info.resize_required) {
const auto element_size = m_buffer_infos.at(bid).element_size;
const auto allocation_size_bytes = info.new_range.size() * element_size;
if(can_allocate(allocation_size_bytes)) {
// Easy path: We can just do the resize on the device directly
replacement_buf = backing_buffer{m_buffer_infos.at(bid).construct_device(info.new_range, m_queue), info.new_offset};
} else {
bool spill_to_host = false;
// Check if we can do the resize by going through host first (see if we'll be able to fit just the added elements of the resized buffer).
if(!can_allocate(allocation_size_bytes - (existing_buf.storage->get_range().size() * element_size))) {
// Final attempt: Check if we can create a new buffer with the requested size if we spill everything else to the host.
if(can_allocate(sr.range.size() * element_size, existing_buf.storage->get_range().size() * element_size)) {
spill_to_host = true;
} else {
// TODO: Same thing as above (could evict other buffers first)
die(allocation_size_bytes);
}
}

if(spill_to_host) {
CELERITY_WARN("Buffer {} cannot be resized to fit fully into device memory, spilling partially to host and only storing requested range on "
"device. Performance may be degraded.",
bid);
} else {
CELERITY_WARN("Resize of buffer {} requires temporarily copying to host memory. Performance may be degraded.", bid);
}

// Use faux host accesses to retain all data from the device (except what is going to be discarded anyway).
// TODO: This could be made more efficient, currently it may cause multiple consecutive resizes.
GridRegion<3> retain_region = subrange_to_grid_box(subrange<3>{existing_buf.offset, existing_buf.storage->get_range()});
if(!access::mode_traits::is_consumer(mode)) {
retain_region = GridRegion<3>::difference(retain_region, subrange_to_grid_box(subrange<3>{sr.offset, sr.range}));
}
retain_region.scanByBoxes([&](const GridBox<3>& box) {
const auto sr = grid_box_to_subrange(box);
access_host_buffer_impl(bid, access_mode::read, subrange<3>{sr.offset, sr.range});
});

// We now have all data "backed up" on the host, so we may deallocate the device buffer (via destructor).
existing_buf = backing_buffer{};
auto locations = m_newest_data_location.at(bid).get_region_values(retain_region);
for(auto& [box, locs] : locations) {
assert(locs == data_location::host_and_device);
m_newest_data_location.at(bid).update_region(box, data_location::host);
}

// Finally create the new device buffer. It will be made coherent with data from the host below.
// If we have to spill to host, only allocate the currently requested subrange. Otherwise use bounding box of existing and new range.
replacement_buf = backing_buffer{
m_buffer_infos.at(bid).construct_device(spill_to_host ? sr.range : info.new_range, m_queue), spill_to_host ? sr.offset : info.new_offset};
}
}

Expand All @@ -106,7 +169,11 @@ namespace detail {

buffer_manager::access_info buffer_manager::access_host_buffer(buffer_id bid, access_mode mode, const subrange<3>& sr) {
std::unique_lock lock(m_mutex);
assert((range_cast<3>(sr.offset + sr.range) <= m_buffer_infos.at(bid).range) == range(true, true, true));
return access_host_buffer_impl(bid, mode, sr);
}

buffer_manager::access_info buffer_manager::access_host_buffer_impl(const buffer_id bid, const access_mode mode, const subrange<3>& sr) {
assert((range_cast<3>(sr.offset + sr.range) <= m_buffer_infos.at(bid).range) == range<3>(true, true, true));

auto& existing_buf = m_buffers[bid].host_buf;
backing_buffer replacement_buf;
Expand Down
2 changes: 2 additions & 0 deletions src/device_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ namespace detail {
auto device = std::visit(
[&cfg](const auto& value) { return ::celerity::detail::pick_device(cfg, value, cl::sycl::platform::get_platforms()); }, user_device_or_selector);
m_sycl_queue = std::make_unique<cl::sycl::queue>(device, handle_exceptions, props);

m_global_mem_total_size_bytes = m_sycl_queue->get_device().get_info<sycl::info::device::global_mem_size>();
}

void device_queue::handle_async_exceptions(cl::sycl::exception_list el) const {
Expand Down
25 changes: 15 additions & 10 deletions src/worker_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -212,20 +212,25 @@ namespace detail {
for(size_t i = 0; i < access_map.get_num_accesses(); ++i) {
const auto [bid, mode] = access_map.get_nth_access(i);
const auto sr = grid_box_to_subrange(access_map.get_requirements_for_nth_access(i, tsk->get_dimensions(), data.sr, tsk->get_global_size()));
const auto info = m_buffer_mngr.access_device_buffer(bid, mode, sr);

try {
const auto info = m_buffer_mngr.access_device_buffer(bid, mode, sr);
#if CELERITY_ACCESSOR_BOUNDARY_CHECK
auto* const oob_idx = sycl::malloc_shared<id<3>>(2, m_queue.get_sycl_queue());
assert(oob_idx != nullptr);
constexpr size_t size_t_max = std::numeric_limits<size_t>::max();
const auto buffer_dims = m_buffer_mngr.get_buffer_info(bid).dimensions;
oob_idx[0] = id<3>{size_t_max, buffer_dims > 1 ? size_t_max : 0, buffer_dims == 3 ? size_t_max : 0};
oob_idx[1] = id<3>{1, 1, 1};
m_oob_indices_per_accessor.push_back(oob_idx);
accessor_infos.push_back(closure_hydrator::accessor_info{info.ptr, info.backing_buffer_range, info.backing_buffer_offset, sr, oob_idx});
auto* const oob_idx = sycl::malloc_shared<id<3>>(2, m_queue.get_sycl_queue());
assert(oob_idx != nullptr);
constexpr size_t size_t_max = std::numeric_limits<size_t>::max();
const auto buffer_dims = m_buffer_mngr.get_buffer_info(bid).dimensions;
oob_idx[0] = id<3>{size_t_max, buffer_dims > 1 ? size_t_max : 0, buffer_dims == 3 ? size_t_max : 0};
oob_idx[1] = id<3>{1, 1, 1};
m_oob_indices_per_accessor.push_back(oob_idx);
accessor_infos.push_back(closure_hydrator::accessor_info{info.ptr, info.backing_buffer_range, info.backing_buffer_offset, sr, oob_idx});
#else
accessor_infos.push_back(closure_hydrator::accessor_info{info.ptr, info.backing_buffer_range, info.backing_buffer_offset, sr});
accessor_infos.push_back(closure_hydrator::accessor_info{info.ptr, info.backing_buffer_range, info.backing_buffer_offset, sr});
#endif
} catch(allocation_error& e) {
CELERITY_CRITICAL("Encountered allocation error while trying to prepare {}", get_description(pkg));
std::terminate();
}
}

for(size_t i = 0; i < reductions.size(); ++i) {
Expand Down
Loading

0 comments on commit 79f97c2

Please sign in to comment.