Skip to content

Commit

Permalink
Reduce peak memory requirement in graph creation (part 2/2) (#2081)
Browse files Browse the repository at this point in the history
This PR reduces peak memory usage and replaces one large allocation with few smaller allocations with the same aggregate size (this works better with the pool allocator) in graph creation. Now we can create a graph from 2^30 edges (64 bit vertex IDs & 4 byte edge weight, 20B per edge) limiting the maximum pool size to 36 GB.

Authors:
  - Seunghwa Kang (https://github.com/seunghwak)

Approvers:
  - Chuck Hastings (https://github.com/ChuckHastings)
  - Kumar Aatish (https://github.com/kaatish)

URL: #2081
  • Loading branch information
seunghwak authored Mar 8, 2022
1 parent 7436a04 commit 93dba00
Show file tree
Hide file tree
Showing 15 changed files with 1,088 additions and 665 deletions.
16 changes: 15 additions & 1 deletion cpp/include/cugraph/graph.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -97,6 +97,13 @@ class graph_t<vertex_t, edge_t, weight_t, store_transposed, multi_gpu, std::enab
graph_meta_t<vertex_t, edge_t, multi_gpu> meta,
bool do_expensive_check = false);

graph_t(raft::handle_t const& handle,
std::vector<rmm::device_uvector<vertex_t>>&& edgelist_src_partitions,
std::vector<rmm::device_uvector<vertex_t>>&& edgelist_dst_partitions,
std::optional<std::vector<rmm::device_uvector<weight_t>>>&& edge_weight_partitions,
graph_meta_t<vertex_t, edge_t, multi_gpu> meta,
bool do_expensive_check = false);

/**
* @brief Symmetrize this graph.
*
Expand Down Expand Up @@ -264,6 +271,13 @@ class graph_t<vertex_t, edge_t, weight_t, store_transposed, multi_gpu, std::enab
graph_meta_t<vertex_t, edge_t, multi_gpu> meta,
bool do_expensive_check = false);

graph_t(raft::handle_t const& handle,
rmm::device_uvector<vertex_t>&& edgelist_srcs,
rmm::device_uvector<vertex_t>&& edgelist_dsts,
std::optional<rmm::device_uvector<weight_t>>&& edgelist_weights,
graph_meta_t<vertex_t, edge_t, multi_gpu> meta,
bool do_expensive_check = false);

/**
* @brief Symmetrize this graph.
*
Expand Down
8 changes: 4 additions & 4 deletions cpp/include/cugraph/graph_functions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,16 +351,16 @@ symmetrize_edgelist(raft::handle_t const& handle,
* @param graph_view Graph view object of the input graph to be coarsened.
* @param labels Vertex labels (assigned to this process in multi-GPU) to be used in coarsening.
* @param do_expensive_check A flag to run expensive checks for input arguments (if set to `true`).
* @return std::tuple<std::unique_ptr<graph_t<vertex_t, edge_t, weight_t, store_transposed,
* multi_gpu>>, rmm::device_uvector<vertex_t>> Tuple of the coarsened graph and labels mapped to the
* vertices (assigned to this process in multi-GPU) in the coarsened graph.
* @return std::tuple<graph_t<vertex_t, edge_t, weight_t, store_transposed, multi_gpu>,
* rmm::device_uvector<vertex_t>> Tuple of the coarsened graph and labels mapped to the vertices
* (assigned to this process in multi-GPU) in the coarsened graph.
*/
template <typename vertex_t,
typename edge_t,
typename weight_t,
bool store_transposed,
bool multi_gpu>
std::tuple<std::unique_ptr<graph_t<vertex_t, edge_t, weight_t, store_transposed, multi_gpu>>,
std::tuple<graph_t<vertex_t, edge_t, weight_t, store_transposed, multi_gpu>,
rmm::device_uvector<vertex_t>>
coarsen_graph(
raft::handle_t const& handle,
Expand Down
2 changes: 2 additions & 0 deletions cpp/include/cugraph/utilities/misc_utils.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ std::tuple<std::vector<vertex_t>, std::vector<edge_t>> compute_offset_aligned_ed
d_vertex_offsets.size(),
handle.get_stream());

handle.sync_stream();

return std::make_tuple(h_vertex_offsets, h_edge_offsets);
}

Expand Down
182 changes: 178 additions & 4 deletions cpp/include/cugraph/utilities/shuffle_comm.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,135 @@ struct kv_pair_group_id_greater_equal_t {
}
};

template <typename ValueIterator>
void swap_partitions(ValueIterator value_first,
ValueIterator value_last,
size_t first_partition_size,
rmm::cuda_stream_view stream_view)
{
auto num_elements = static_cast<size_t>(thrust::distance(value_first, value_last));
auto second_partition_size = num_elements - first_partition_size;
if (first_partition_size >= second_partition_size) {
auto tmp_value_buffer =
allocate_dataframe_buffer<typename thrust::iterator_traits<ValueIterator>::value_type>(
first_partition_size, stream_view);

thrust::copy(rmm::exec_policy(stream_view),
value_first,
value_first + first_partition_size,
get_dataframe_buffer_begin(tmp_value_buffer));

thrust::copy(rmm::exec_policy(stream_view),
value_first + first_partition_size,
value_first + num_elements,
value_first);

thrust::copy(rmm::exec_policy(stream_view),
get_dataframe_buffer_begin(tmp_value_buffer),
get_dataframe_buffer_end(tmp_value_buffer),
value_first + second_partition_size);
} else {
auto tmp_value_buffer =
allocate_dataframe_buffer<typename thrust::iterator_traits<ValueIterator>::value_type>(
second_partition_size, stream_view);

thrust::copy(rmm::exec_policy(stream_view),
value_first + first_partition_size,
value_first + num_elements,
get_dataframe_buffer_begin(tmp_value_buffer));

thrust::copy(rmm::exec_policy(stream_view),
value_first,
value_first + first_partition_size,
value_first + (num_elements - first_partition_size));

thrust::copy(rmm::exec_policy(stream_view),
get_dataframe_buffer_begin(tmp_value_buffer),
get_dataframe_buffer_end(tmp_value_buffer),
value_first);
}
}

template <typename KeyIterator, typename ValueIterator>
void swap_partitions(KeyIterator key_first,
KeyIterator key_last,
ValueIterator value_first,
size_t first_partition_size,
rmm::cuda_stream_view stream_view)
{
auto num_elements = static_cast<size_t>(thrust::distance(key_first, key_last));
auto second_partition_size = num_elements - first_partition_size;
if (first_partition_size >= second_partition_size) {
auto tmp_key_buffer =
allocate_dataframe_buffer<typename thrust::iterator_traits<KeyIterator>::value_type>(
first_partition_size, stream_view);
auto tmp_value_buffer =
allocate_dataframe_buffer<typename thrust::iterator_traits<ValueIterator>::value_type>(
first_partition_size, stream_view);

thrust::copy(rmm::exec_policy(stream_view),
key_first,
key_first + first_partition_size,
get_dataframe_buffer_begin(tmp_key_buffer));
thrust::copy(rmm::exec_policy(stream_view),
value_first,
value_first + first_partition_size,
get_dataframe_buffer_begin(tmp_value_buffer));

thrust::copy(rmm::exec_policy(stream_view),
key_first + first_partition_size,
key_first + num_elements,
key_first);
thrust::copy(rmm::exec_policy(stream_view),
value_first + first_partition_size,
value_first + num_elements,
value_first);

thrust::copy(rmm::exec_policy(stream_view),
get_dataframe_buffer_begin(tmp_key_buffer),
get_dataframe_buffer_end(tmp_key_buffer),
key_first + second_partition_size);
thrust::copy(rmm::exec_policy(stream_view),
get_dataframe_buffer_begin(tmp_value_buffer),
get_dataframe_buffer_end(tmp_value_buffer),
value_first + second_partition_size);
} else {
auto tmp_key_buffer =
allocate_dataframe_buffer<typename thrust::iterator_traits<KeyIterator>::value_type>(
second_partition_size, stream_view);
auto tmp_value_buffer =
allocate_dataframe_buffer<typename thrust::iterator_traits<ValueIterator>::value_type>(
second_partition_size, stream_view);

thrust::copy(rmm::exec_policy(stream_view),
key_first + first_partition_size,
key_first + num_elements,
get_dataframe_buffer_begin(tmp_key_buffer));
thrust::copy(rmm::exec_policy(stream_view),
value_first + first_partition_size,
value_first + num_elements,
get_dataframe_buffer_begin(tmp_value_buffer));

thrust::copy(rmm::exec_policy(stream_view),
key_first,
key_first + first_partition_size,
key_first + (num_elements - first_partition_size));
thrust::copy(rmm::exec_policy(stream_view),
value_first,
value_first + first_partition_size,
value_first + (num_elements - first_partition_size));

thrust::copy(rmm::exec_policy(stream_view),
get_dataframe_buffer_begin(tmp_key_buffer),
get_dataframe_buffer_end(tmp_key_buffer),
key_first);
thrust::copy(rmm::exec_policy(stream_view),
get_dataframe_buffer_begin(tmp_value_buffer),
get_dataframe_buffer_end(tmp_value_buffer),
value_first);
}
}

// Use roughly half temporary buffer than thrust::partition (if first & second partition sizes are
// comparable). This also uses multiple smaller allocations than one single allocation (thrust::sort
// does this) of the same aggregate size if the input iterators are the zip iterators (this is more
Expand Down Expand Up @@ -330,8 +459,28 @@ void mem_frugal_groupby(
});
}
} else {
auto second_first = mem_frugal_partition(
value_firsts[i], value_lasts[i], value_to_group_id_op, pivot, stream_view);
ValueIterator second_first{};
auto num_elements = static_cast<size_t>(thrust::distance(value_firsts[i], value_lasts[i]));
auto first_chunk_partition_first = mem_frugal_partition(value_firsts[i],
value_firsts[i] + num_elements / 2,
value_to_group_id_op,
pivot,
stream_view);
auto second_chunk_partition_first = mem_frugal_partition(value_firsts[i] + num_elements / 2,
value_lasts[i],
value_to_group_id_op,
pivot,
stream_view);
auto no_less_size = static_cast<size_t>(
thrust::distance(first_chunk_partition_first, value_firsts[i] + num_elements / 2));
auto less_size = static_cast<size_t>(
thrust::distance(value_firsts[i] + num_elements / 2, second_chunk_partition_first));
swap_partitions(value_firsts[i] + (num_elements / 2 - no_less_size),
value_firsts[i] + (num_elements / 2 + less_size),
no_less_size,
stream_view);

second_first = value_firsts[i] + ((num_elements / 2 - no_less_size) + less_size);
if (pivot - group_firsts[i] > 1) {
group_firsts.push_back(group_firsts[i]);
group_lasts.push_back(pivot);
Expand Down Expand Up @@ -402,8 +551,33 @@ void mem_frugal_groupby(
});
}
} else {
auto second_first = mem_frugal_partition(
key_firsts[i], key_lasts[i], value_firsts[i], key_to_group_id_op, pivot, stream_view);
std::tuple<KeyIterator, ValueIterator> second_first{};
auto num_elements = static_cast<size_t>(thrust::distance(key_firsts[i], key_lasts[i]));
auto first_chunk_partition_first = mem_frugal_partition(key_firsts[i],
key_firsts[i] + num_elements / 2,
value_firsts[i],
key_to_group_id_op,
pivot,
stream_view);
auto second_chunk_partition_first = mem_frugal_partition(key_firsts[i] + num_elements / 2,
key_lasts[i],
value_firsts[i] + num_elements / 2,
key_to_group_id_op,
pivot,
stream_view);
auto no_less_size = static_cast<size_t>(thrust::distance(
std::get<0>(first_chunk_partition_first), key_firsts[i] + num_elements / 2));
auto less_size = static_cast<size_t>(thrust::distance(
key_firsts[i] + num_elements / 2, std::get<0>(second_chunk_partition_first)));
swap_partitions(key_firsts[i] + (num_elements / 2 - no_less_size),
key_firsts[i] + (num_elements / 2 + less_size),
value_firsts[i] + (num_elements / 2 - no_less_size),
no_less_size,
stream_view);

second_first =
std::make_tuple(key_firsts[i] + ((num_elements / 2 - no_less_size) + less_size),
value_firsts[i] + ((num_elements / 2 - no_less_size) + less_size));
if (pivot - group_firsts[i] > 1) {
group_firsts.push_back(group_firsts[i]);
group_lasts.push_back(pivot);
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/community/louvain.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ class Louvain {
#endif
handle_(handle),
dendrogram_(std::make_unique<Dendrogram<vertex_t>>()),
current_graph_(handle),
current_graph_view_(graph_view),
cluster_keys_v_(0, handle.get_stream()),
cluster_weights_v_(0, handle.get_stream()),
Expand Down Expand Up @@ -559,7 +560,7 @@ class Louvain {
std::tie(current_graph_, numbering_map) =
coarsen_graph(handle_, current_graph_view_, dendrogram_->current_level_begin());

current_graph_view_ = current_graph_->view();
current_graph_view_ = current_graph_.view();

rmm::device_uvector<vertex_t> numbering_indices(numbering_map.size(), handle_.get_stream());
thrust::sequence(handle_.get_thrust_policy(),
Expand Down Expand Up @@ -589,7 +590,7 @@ class Louvain {
// but as we shrink the graph we'll keep the
// current graph here
//
std::unique_ptr<graph_t> current_graph_{};
graph_t current_graph_;
graph_view_t current_graph_view_;

rmm::device_uvector<vertex_t> cluster_keys_v_;
Expand Down
56 changes: 51 additions & 5 deletions cpp/src/detail/shuffle_wrappers.cu
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <cugraph/detail/graph_utils.cuh>
#include <cugraph/detail/shuffle_wrappers.hpp>
#include <cugraph/partition_manager.hpp>
#include <cugraph/utilities/host_scalar_comm.cuh>
#include <cugraph/utilities/shuffle_comm.cuh>

#include <rmm/exec_policy.hpp>
Expand Down Expand Up @@ -51,6 +52,12 @@ shuffle_edgelist_by_gpu_id(raft::handle_t const& handle,
auto mem_frugal_threshold =
static_cast<size_t>(static_cast<double>(total_global_mem / element_size) * mem_frugal_ratio);

auto mem_frugal_flag =
host_scalar_allreduce(comm,
d_edgelist_majors.size() > mem_frugal_threshold ? int{1} : int{0},
raft::comms::op_t::MAX,
handle.get_stream());

// invoke groupby_and_count and shuffle values to pass mem_frugal_threshold instead of directly
// calling groupby_gpu_id_and_shuffle_values there is no benefit in reducing peak memory as we
// need to allocate a receive buffer anyways) but this reduces the maximum memory allocation size
Expand Down Expand Up @@ -84,9 +91,32 @@ shuffle_edgelist_by_gpu_id(raft::handle_t const& handle,
handle.get_stream());
handle.sync_stream();

std::forward_as_tuple(
std::tie(d_rx_edgelist_majors, d_rx_edgelist_minors, d_rx_edgelist_weights), std::ignore) =
shuffle_values(comm, edge_first, h_tx_value_counts, handle.get_stream());
if (mem_frugal_flag) { // trade-off potential parallelism to lower peak memory
std::tie(d_rx_edgelist_majors, std::ignore) =
shuffle_values(comm, d_edgelist_majors.begin(), h_tx_value_counts, handle.get_stream());
d_edgelist_majors.resize(0, handle.get_stream());
d_edgelist_majors.shrink_to_fit(handle.get_stream());

std::tie(d_rx_edgelist_minors, std::ignore) =
shuffle_values(comm, d_edgelist_minors.begin(), h_tx_value_counts, handle.get_stream());
d_edgelist_minors.resize(0, handle.get_stream());
d_edgelist_minors.shrink_to_fit(handle.get_stream());

std::tie(d_rx_edgelist_weights, std::ignore) =
shuffle_values(comm, (*d_edgelist_weights).begin(), h_tx_value_counts, handle.get_stream());
(*d_edgelist_weights).resize(0, handle.get_stream());
(*d_edgelist_weights).shrink_to_fit(handle.get_stream());
} else {
std::forward_as_tuple(
std::tie(d_rx_edgelist_majors, d_rx_edgelist_minors, d_rx_edgelist_weights), std::ignore) =
shuffle_values(comm, edge_first, h_tx_value_counts, handle.get_stream());
d_edgelist_majors.resize(0, handle.get_stream());
d_edgelist_majors.shrink_to_fit(handle.get_stream());
d_edgelist_minors.resize(0, handle.get_stream());
d_edgelist_minors.shrink_to_fit(handle.get_stream());
(*d_edgelist_weights).resize(0, handle.get_stream());
(*d_edgelist_weights).shrink_to_fit(handle.get_stream());
}
} else {
auto edge_first = thrust::make_zip_iterator(
thrust::make_tuple(d_edgelist_majors.begin(), d_edgelist_minors.begin()));
Expand All @@ -110,8 +140,24 @@ shuffle_edgelist_by_gpu_id(raft::handle_t const& handle,
handle.get_stream());
handle.sync_stream();

std::forward_as_tuple(std::tie(d_rx_edgelist_majors, d_rx_edgelist_minors), std::ignore) =
shuffle_values(comm, edge_first, h_tx_value_counts, handle.get_stream());
if (mem_frugal_flag) { // trade-off potential parallelism to lower peak memory
std::tie(d_rx_edgelist_majors, std::ignore) =
shuffle_values(comm, d_edgelist_majors.begin(), h_tx_value_counts, handle.get_stream());
d_edgelist_majors.resize(0, handle.get_stream());
d_edgelist_majors.shrink_to_fit(handle.get_stream());

std::tie(d_rx_edgelist_minors, std::ignore) =
shuffle_values(comm, d_edgelist_minors.begin(), h_tx_value_counts, handle.get_stream());
d_edgelist_minors.resize(0, handle.get_stream());
d_edgelist_minors.shrink_to_fit(handle.get_stream());
} else {
std::forward_as_tuple(std::tie(d_rx_edgelist_majors, d_rx_edgelist_minors), std::ignore) =
shuffle_values(comm, edge_first, h_tx_value_counts, handle.get_stream());
d_edgelist_majors.resize(0, handle.get_stream());
d_edgelist_majors.shrink_to_fit(handle.get_stream());
d_edgelist_minors.resize(0, handle.get_stream());
d_edgelist_minors.shrink_to_fit(handle.get_stream());
}
}

return std::make_tuple(std::move(d_rx_edgelist_majors),
Expand Down
Loading

0 comments on commit 93dba00

Please sign in to comment.