From ad74817603ae77e7e2eff6a1f30b2976410d3949 Mon Sep 17 00:00:00 2001 From: Seunghwa Kang <45857425+seunghwak@users.noreply.github.com> Date: Thu, 27 Jul 2023 13:32:46 -0700 Subject: [PATCH] Compute shortest distances between given sets of origins and destinations for large diameter graphs (#3741) This code computes shortest distances between given origins and destinations and produces an OD matrix. This code runs many SSSPs in parallel to extract sufficient parallelism considering that an SSSP run on a large diameter graph has only limited amount of parallelism. This code uses hash table (cuco::static_map) to reduce memory footprint (and this enable running many SSSPs by cutting memory requirement to store distances from O(V) to the number of vertices around the active frontier). For small diameter graphs (where one can extract sufficient parallelism with only a single SSSP run), sequentially running multiple SSSPs may be more performant (as hash table access is more expensive than linear array access + additional overheads to run many SSSPs in parallel). Authors: - Seunghwa Kang (https://github.com/seunghwak) Approvers: - Brad Rees (https://github.com/BradReesWork) - Chuck Hastings (https://github.com/ChuckHastings) - Naim (https://github.com/naimnv) URL: https://github.com/rapidsai/cugraph/pull/3741 --- cpp/CMakeLists.txt | 1 + cpp/include/cugraph/algorithms.hpp | 39 + .../cugraph/edge_partition_device_view.cuh | 44 +- .../cugraph/utilities/dataframe_buffer.hpp | 16 + .../weakly_connected_components_impl.cuh | 4 +- .../detail/extract_transform_v_frontier_e.cuh | 4 +- cpp/src/prims/key_store.cuh | 460 +++++++ cpp/src/prims/kv_store.cuh | 9 +- ...r_v_random_select_transform_outgoing_e.cuh | 4 +- ...rm_reduce_v_frontier_outgoing_e_by_dst.cuh | 17 +- cpp/src/prims/vertex_frontier.cuh | 53 +- .../traversal/od_shortest_distances_impl.cuh | 1092 +++++++++++++++++ cpp/src/traversal/od_shortest_distances_sg.cu | 76 ++ cpp/tests/CMakeLists.txt | 4 + .../traversal/od_shortest_distances_test.cpp | 286 +++++ 15 files changed, 2048 insertions(+), 61 deletions(-) create mode 100644 cpp/src/prims/key_store.cuh create mode 100644 cpp/src/traversal/od_shortest_distances_impl.cuh create mode 100644 cpp/src/traversal/od_shortest_distances_sg.cu create mode 100644 cpp/tests/traversal/od_shortest_distances_test.cpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 90f33574d6f..85f8144d0d3 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -270,6 +270,7 @@ set(CUGRAPH_SOURCES src/traversal/bfs_sg.cu src/traversal/bfs_mg.cu src/traversal/sssp_sg.cu + src/traversal/od_shortest_distances_sg.cu src/traversal/sssp_mg.cu src/link_analysis/hits_sg.cu src/link_analysis/hits_mg.cu diff --git a/cpp/include/cugraph/algorithms.hpp b/cpp/include/cugraph/algorithms.hpp index 547a298fba9..29a488e7505 100644 --- a/cpp/include/cugraph/algorithms.hpp +++ b/cpp/include/cugraph/algorithms.hpp @@ -1179,6 +1179,45 @@ void sssp(raft::handle_t const& handle, weight_t cutoff = std::numeric_limits::max(), bool do_expensive_check = false); +/* + * @brief Compute the shortest distances from the given origins to all the given destinations. + * + * This algorithm is designed for large diameter graphs. For small diameter graphs, running the + * cugraph::sssp function in a sequentially executed loop might be faster. This algorithms currently + * works only for single-GPU (we are not aware of large diameter graphs that won't fit in a single + * GPU). + * + * @throws cugraph::logic_error on erroneous input arguments. + * + * @tparam vertex_t Type of vertex identifiers. Needs to be an integral type. + * @tparam edge_t Type of edge identifiers. Needs to be an integral type. + * @tparam weight_t Type of edge weights. Needs to be a floating point type. + * @tparam multi_gpu Flag indicating whether template instantiation should target single-GPU (false) + * or multi-GPU (true). + * @param handle RAFT handle object to encapsulate resources (e.g. CUDA stream, communicator, and + * handles to various CUDA libraries) to run graph algorithms. + * @param graph_view Graph view object. + * @param edge_weight_view View object holding edge weights for @p graph_view. + * @param origins An array of origins (starting vertices) to find shortest distances. There should + * be no duplicates in @p origins. + * @param destinations An array of destinations (end vertices) to find shortest distances. There + * should be no duplicates in @p destinations. + * @param cutoff Any destinations farther than @p cutoff will be marked as unreachable. + * @param do_expensive_check A flag to run expensive checks for input arguments (if set to `true`). + * @return A vector of size @p origins.size() * @p destinations.size(). The i'th element of the + * returned vector is the shortest distance from the (i / @p destinations.size())'th origin to the + * (i % @p destinations.size())'th destination. + */ +template +rmm::device_uvector od_shortest_distances( + raft::handle_t const& handle, + graph_view_t const& graph_view, + edge_property_view_t edge_weight_view, + raft::device_span origins, + raft::device_span destinations, + weight_t cutoff = std::numeric_limits::max(), + bool do_expensive_check = false); + /** * @brief Compute PageRank scores. * diff --git a/cpp/include/cugraph/edge_partition_device_view.cuh b/cpp/include/cugraph/edge_partition_device_view.cuh index 02b931fbde6..d34d639f4d9 100644 --- a/cpp/include/cugraph/edge_partition_device_view.cuh +++ b/cpp/include/cugraph/edge_partition_device_view.cuh @@ -156,13 +156,15 @@ class edge_partition_device_view_t majors, + template + size_t compute_number_of_edges(MajorIterator major_first, + MajorIterator major_last, rmm::cuda_stream_view stream) const { return dcs_nzd_vertices_ ? thrust::transform_reduce( rmm::exec_policy(stream), - majors.begin(), - majors.end(), + major_first, + major_last, detail::local_degree_op_t< vertex_t, edge_t, @@ -176,8 +178,8 @@ class edge_partition_device_view_t()) : thrust::transform_reduce( rmm::exec_policy(stream), - majors.begin(), - majors.end(), + major_first, + major_last, detail::local_degree_op_t< vertex_t, edge_t, @@ -217,15 +219,17 @@ class edge_partition_device_view_t compute_local_degrees(raft::device_span majors, + template + rmm::device_uvector compute_local_degrees(MajorIterator major_first, + MajorIterator major_last, rmm::cuda_stream_view stream) const { - rmm::device_uvector local_degrees(majors.size(), stream); + rmm::device_uvector local_degrees(thrust::distance(major_first, major_last), stream); if (dcs_nzd_vertices_) { assert(major_hypersparse_first_); thrust::transform(rmm::exec_policy(stream), - majors.begin(), - majors.end(), + major_first, + major_last, local_degrees.begin(), detail::local_degree_op_t{ this->offsets_, @@ -235,8 +239,8 @@ class edge_partition_device_view_t{ this->offsets_, major_range_first_, std::byte{0} /* dummy */, std::byte{0} /* dummy */}); @@ -349,13 +353,15 @@ class edge_partition_device_view_t majors, + template + size_t compute_number_of_edges(MajorIterator major_first, + MajorIterator major_last, rmm::cuda_stream_view stream) const { return thrust::transform_reduce( rmm::exec_policy(stream), - majors.begin(), - majors.end(), + major_first, + major_last, detail::local_degree_op_t compute_local_degrees(raft::device_span majors, + template + rmm::device_uvector compute_local_degrees(MajorIterator major_first, + MajorIterator major_last, rmm::cuda_stream_view stream) const { - rmm::device_uvector local_degrees(majors.size(), stream); + rmm::device_uvector local_degrees(thrust::distance(major_first, major_last), stream); thrust::transform(rmm::exec_policy(stream), - majors.begin(), - majors.end(), + major_first, + major_last, local_degrees.begin(), detail::local_degree_op_t{ this->offsets_, diff --git a/cpp/include/cugraph/utilities/dataframe_buffer.hpp b/cpp/include/cugraph/utilities/dataframe_buffer.hpp index 49898f6c855..0e045ed7dbd 100644 --- a/cpp/include/cugraph/utilities/dataframe_buffer.hpp +++ b/cpp/include/cugraph/utilities/dataframe_buffer.hpp @@ -99,6 +99,22 @@ auto allocate_dataframe_buffer(size_t buffer_size, rmm::cuda_stream_view stream_ std::make_index_sequence(), buffer_size, stream_view); } +template +void reserve_dataframe_buffer(BufferType& buffer, + size_t new_buffer_capacity, + rmm::cuda_stream_view stream_view) +{ + static_assert(is_std_tuple_of_arithmetic_vectors>::value || + is_arithmetic_vector, rmm::device_uvector>::value); + if constexpr (is_std_tuple_of_arithmetic_vectors>::value) { + std::apply([new_buffer_capacity, stream_view]( + auto&&... args) { (args.reserve(new_buffer_capacity, stream_view), ...); }, + buffer); + } else { + buffer.reserve(new_buffer_capacity, stream_view); + } +} + template void resize_dataframe_buffer(BufferType& buffer, size_t new_buffer_size, diff --git a/cpp/src/components/weakly_connected_components_impl.cuh b/cpp/src/components/weakly_connected_components_impl.cuh index 5e228d2445d..615a50ded54 100644 --- a/cpp/src/components/weakly_connected_components_impl.cuh +++ b/cpp/src/components/weakly_connected_components_impl.cuh @@ -223,7 +223,7 @@ struct v_op_t { decltype(thrust::make_zip_iterator(thrust::make_tuple( static_cast(nullptr), static_cast(nullptr)))) edge_buffer_first{}; // FIXME: we can use cuda::atomic instead but currently on a system with x86 + GPU, this requires - // placing the atomic barrier on managed memory and this adds additional complication. + // placing the atomic variable on managed memory and this adds additional complication. size_t* num_edge_inserts{}; size_t bucket_idx_next{}; size_t bucket_idx_conflict{}; // relevant only if GraphViewType::is_multi_gpu is true @@ -501,7 +501,7 @@ void weakly_connected_components_impl(raft::handle_t const& handle, auto edge_buffer = allocate_dataframe_buffer>(0, handle.get_stream()); // FIXME: we can use cuda::atomic instead but currently on a system with x86 + GPU, this - // requires placing the atomic variable on managed memory and this make it less attractive. + // requires placing the atomic variable on managed memory and this adds additional complication. rmm::device_scalar num_edge_inserts(size_t{0}, handle.get_stream()); auto edge_dst_components = diff --git a/cpp/src/prims/detail/extract_transform_v_frontier_e.cuh b/cpp/src/prims/detail/extract_transform_v_frontier_e.cuh index ac57c8f180a..febdf61943b 100644 --- a/cpp/src/prims/detail/extract_transform_v_frontier_e.cuh +++ b/cpp/src/prims/detail/extract_transform_v_frontier_e.cuh @@ -877,9 +877,7 @@ extract_transform_v_frontier_e(raft::handle_t const& handle, auto segment_offsets = graph_view.local_edge_partition_segment_offsets(i); auto max_pushes = edge_partition.compute_number_of_edges( - raft::device_span(edge_partition_frontier_major_first, - edge_partition_frontier_major_last), - handle.get_stream()); + edge_partition_frontier_major_first, edge_partition_frontier_major_last, handle.get_stream()); auto new_buffer_size = buffer_idx.value(handle.get_stream()) + max_pushes; resize_optional_dataframe_buffer( diff --git a/cpp/src/prims/key_store.cuh b/cpp/src/prims/key_store.cuh new file mode 100644 index 00000000000..6d135b4e94e --- /dev/null +++ b/cpp/src/prims/key_store.cuh @@ -0,0 +1,460 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace cugraph { + +namespace detail { + +template +struct key_binary_search_contains_op_t { + using key_type = typename thrust::iterator_traits::value_type; + + KeyIterator store_key_first{}; + KeyIterator store_key_last{}; + + __device__ bool operator()(key_type key) const + { + return thrust::binary_search(thrust::seq, store_key_first, store_key_last, key); + } +}; + +template +struct key_binary_search_store_device_view_t { + using key_type = typename ViewType::key_type; + + static_assert(ViewType::binary_search); + + __host__ key_binary_search_store_device_view_t(ViewType view) + : store_key_first(view.store_key_first()), store_key_last(view.store_key_last()) + { + } + + __device__ bool contains(key_type key) const + { + return thrust::binary_search(thrust::seq, store_key_first, store_key_last, key); + } + + typename ViewType::key_iterator store_key_first{}; + typename ViewType::key_iterator store_key_last{}; +}; + +template +struct key_cuco_store_contains_device_view_t { + using key_type = typename ViewType::key_type; + using cuco_store_device_ref_type = + typename ViewType::cuco_store_type::ref_type; + + static_assert(!ViewType::binary_search); + + __host__ key_cuco_store_contains_device_view_t(ViewType view) + : cuco_store_device_ref(view.cuco_store_contains_device_ref()) + { + } + + __device__ bool contains(key_type key) const { return cuco_store_device_ref.contains(key); } + + cuco_store_device_ref_type cuco_store_device_ref{}; +}; + +template +struct key_cuco_store_insert_device_view_t { + using key_type = typename ViewType::key_type; + using cuco_store_device_ref_type = + typename ViewType::cuco_store_type::ref_type; + + static_assert(!ViewType::binary_search); + + __host__ key_cuco_store_insert_device_view_t(ViewType view) + : cuco_store_device_ref(view.cuco_store_insert_device_ref()) + { + } + + __device__ void insert(key_type key) { cuco_store_device_ref.insert(key); } + + cuco_store_device_ref_type cuco_store_device_ref{}; +}; + +template +class key_binary_search_store_view_t { + public: + using key_type = std::remove_cv_t::value_type>; + using key_iterator = KeyIterator; + + static constexpr bool binary_search = true; + + key_binary_search_store_view_t(KeyIterator key_first, KeyIterator key_last) + : store_key_first_(key_first), store_key_last_(key_last) + { + } + + template + void contains(QueryKeyIterator key_first, + QueryKeyIterator key_last, + ResultValueIterator value_first, + rmm::cuda_stream_view stream) const + { + thrust::transform( + rmm::exec_policy(stream), + key_first, + key_last, + value_first, + key_binary_search_contains_op_t{store_key_first_, store_key_last_}); + } + + KeyIterator store_key_first() const { return store_key_first_; } + + KeyIterator store_key_last() const { return store_key_last_; } + + private: + KeyIterator store_key_first_{}; + KeyIterator store_key_last_{}; +}; + +template +class key_cuco_store_view_t { + public: + using key_type = key_t; + + static constexpr bool binary_search = false; + + using cuco_store_type = cuco::experimental::static_set< + key_t, + cuco::experimental::extent, + cuda::thread_scope_device, + thrust::equal_to, + cuco::experimental::linear_probing<1, // CG size + cuco::murmurhash3_32>, + rmm::mr::stream_allocator_adaptor>>; + + key_cuco_store_view_t(cuco_store_type const* store) : cuco_store_(store) {} + + template + void contains(QueryKeyIterator key_first, + QueryKeyIterator key_last, + ResultValueIterator value_first, + rmm::cuda_stream_view stream) const + { + cuco_store_->contains(key_first, key_last, value_first, stream); + } + + auto cuco_store_contains_device_ref() const + { + return cuco_store_->ref(cuco::experimental::contains); + } + + auto cuco_store_insert_device_ref() const { return cuco_store_->ref(cuco::experimental::insert); } + + key_t invalid_key() const { return cuco_store_->get_empty_key_sentinel(); } + + private: + cuco_store_type const* cuco_store_{}; +}; + +template +class key_binary_search_store_t { + public: + using key_type = key_t; + + key_binary_search_store_t(rmm::cuda_stream_view stream) : store_keys_(0, stream) {} + + template + key_binary_search_store_t( + KeyIterator key_first, + KeyIterator key_last, + bool key_sorted /* if set to true, assume that the input data is sorted and skip sorting (which + is necessary for binary-search) */ + , + rmm::cuda_stream_view stream) + : store_keys_(static_cast(thrust::distance(key_first, key_last)), stream) + { + thrust::copy(rmm::exec_policy(stream), key_first, key_last, store_keys_.begin()); + if (!key_sorted) { + thrust::sort(rmm::exec_policy(stream), store_keys_.begin(), store_keys_.end()); + } + } + + key_binary_search_store_t( + rmm::device_uvector&& keys, + bool key_sorted /* if set to true, assume that the input data is sorted and skip sorting (which + is necessary for binary-search) */ + , + rmm::cuda_stream_view stream) + : store_keys_(std::move(keys)) + { + if (!key_sorted) { + thrust::sort(rmm::exec_policy(stream), store_keys_.begin(), store_keys_.end()); + } + } + + auto release(rmm::cuda_stream_view stream) + { + auto tmp_store_keys = std::move(store_keys_); + store_keys_ = rmm::device_uvector(0, stream); + return tmp_store_keys; + } + + key_t const* store_key_first() const { return store_keys_.cbegin(); } + + key_t const* store_key_last() const { return store_keys_.cend(); } + + size_t size() const { return store_keys_.size(); } + + size_t capacity() const { return store_keys_.size(); } + + private: + rmm::device_uvector store_keys_; +}; + +template +class key_cuco_store_t { + public: + using key_type = key_t; + + using cuco_store_type = cuco::experimental::static_set< + key_t, + cuco::experimental::extent, + cuda::thread_scope_device, + thrust::equal_to, + cuco::experimental::linear_probing<1, // CG size + cuco::murmurhash3_32>, + rmm::mr::stream_allocator_adaptor>>; + + key_cuco_store_t(rmm::cuda_stream_view stream) {} + + key_cuco_store_t(size_t capacity, key_t invalid_key, rmm::cuda_stream_view stream) + { + allocate(capacity, invalid_key, stream); + capacity_ = capacity; + size_ = 0; + } + + template + key_cuco_store_t(KeyIterator key_first, + KeyIterator key_last, + key_t invalid_key, + rmm::cuda_stream_view stream) + { + auto num_keys = static_cast(thrust::distance(key_first, key_last)); + allocate(num_keys, invalid_key, stream); + capacity_ = num_keys; + size_ = 0; + + insert(key_first, key_last, stream); + } + + template + void insert(KeyIterator key_first, KeyIterator key_last, rmm::cuda_stream_view stream) + { + auto num_keys = static_cast(thrust::distance(key_first, key_last)); + if (num_keys == 0) return; + + size_ += cuco_store_->insert(key_first, key_last, stream.value()); + } + + template + void insert_if(KeyIterator key_first, + KeyIterator key_last, + StencilIterator stencil_first, + PredOp pred_op, + rmm::cuda_stream_view stream) + { + auto num_keys = static_cast(thrust::distance(key_first, key_last)); + if (num_keys == 0) return; + + size_ += cuco_store_->insert_if(key_first, key_last, stencil_first, pred_op, stream.value()); + } + + auto release(rmm::cuda_stream_view stream) + { + rmm::device_uvector keys(size(), stream); + auto last = cuco_store_->retrieve_all(keys.begin(), stream.value()); + keys.resize(thrust::distance(keys.begin(), last), stream); + keys.shrink_to_fit(stream); + allocate(0, invalid_key(), stream); + capacity_ = 0; + size_ = 0; + return keys; + } + + cuco_store_type const* cuco_store_ptr() const { return cuco_store_.get(); } + + key_t invalid_key() const { return cuco_store_->empty_key_sentinel(); } + + size_t size() const { return size_; } + + size_t capacity() const { return capacity_; } + + private: + void allocate(size_t num_keys, key_t invalid_key, rmm::cuda_stream_view stream) + { + double constexpr load_factor = 0.7; + auto cuco_size = std::max( + static_cast(static_cast(num_keys) / load_factor), + static_cast(num_keys) + 1); // cuco::static_map requires at least one empty slot + + auto stream_adapter = rmm::mr::make_stream_allocator_adaptor( + rmm::mr::polymorphic_allocator(rmm::mr::get_current_device_resource()), stream); + cuco_store_ = std::make_unique( + cuco_size, + cuco::sentinel::empty_key{invalid_key}, + thrust::equal_to{}, + cuco::experimental::linear_probing<1, // CG size + cuco::murmurhash3_32>{}, + stream_adapter, + stream.value()); + } + + std::unique_ptr cuco_store_{nullptr}; + + size_t capacity_{0}; + size_t size_{0}; // caching as cuco_store_->size() is expensive (this scans the entire slots to + // handle user inserts through a device reference +}; + +} // namespace detail + +/* a class to store keys, the actual storage can either be implemented based on binary tree (when + * use_binary_search == true) or hash-table (cuCollection, when use_binary_search = false) */ +template +class key_store_t { + public: + using key_type = key_t; + + static_assert(std::is_arithmetic_v); + + key_store_t(rmm::cuda_stream_view stream) : store_(stream) {} + + /* when use_binary_search = false */ + template + key_store_t( + size_t capacity /* one can expect good performance till the capacity, the actual underlying + capacity can be larger (for performance & correctness reasons) */ + , + key_t invalid_key /* invalid key shouldn't appear in any *iter in [key_first, key_last) */, + rmm::cuda_stream_view stream, + std::enable_if_t = 0) + : store_(capacity, invalid_key, stream) + { + } + + /* when use_binary_search = true */ + template + key_store_t(KeyIterator key_first, + KeyIterator key_last, + bool key_sorted /* if set to true, assume that the input data is sorted and skip + sorting (which is necessary for binary-search) */ + , + rmm::cuda_stream_view stream, + std::enable_if_t = 0) + : store_(key_first, key_last, key_sorted, stream) + { + } + + /* when use_binary_search = false */ + template + key_store_t( + KeyIterator key_first, + KeyIterator key_last, + key_t invalid_key /* invalid key shouldn't appear in any *iter in [key_first, key_last) */, + rmm::cuda_stream_view stream, + std::enable_if_t = 0) + : store_(key_first, key_last, invalid_key, stream) + { + } + + /* when use_binary_search = true */ + template + key_store_t(rmm::device_uvector&& keys, + bool key_sorted /* if set to true, assume that the input data is sorted and skip + sorting (which is necessary for binary-search) */ + , + rmm::cuda_stream_view stream, + std::enable_if_t = 0) + : store_(std::move(keys), key_sorted, stream) + { + } + + /* when use binary_search = false, this requires that the capacity is large enough */ + template + std::enable_if_t insert(KeyIterator key_first, + KeyIterator key_last, + rmm::cuda_stream_view stream) + { + store_.insert(key_first, key_last, stream); + } + + /* when use binary_search = false, this requires that the capacity is large enough */ + template + std::enable_if_t insert_if(KeyIterator key_first, + KeyIterator key_last, + StencilIterator stencil_first, + PredOp pred_op, + rmm::cuda_stream_view stream) + { + store_.insert_if(key_first, key_last, stencil_first, pred_op, stream); + } + + // key_store_t becomes empty after release + auto release(rmm::cuda_stream_view stream) { return store_.release(stream); } + + auto view() const + { + if constexpr (use_binary_search) { + return detail::key_binary_search_store_view_t(store_.store_key_first(), + store_.store_key_last()); + } else { + return detail::key_cuco_store_view_t(store_.cuco_store_ptr()); + } + } + + template + std::enable_if_t invalid_key() const + { + return store_.invalid_key(); + } + + size_t size() const { return store_.size(); } + + size_t capacity() const { return store_.capacity(); } + + private: + std::conditional_t, + detail::key_cuco_store_t> + store_; +}; + +} // namespace cugraph diff --git a/cpp/src/prims/kv_store.cuh b/cpp/src/prims/kv_store.cuh index f20865c92dc..8490bacfd9c 100644 --- a/cpp/src/prims/kv_store.cuh +++ b/cpp/src/prims/kv_store.cuh @@ -109,7 +109,8 @@ struct kv_cuco_insert_and_increment_t { cuda::atomic_ref atomic_counter(*counter); auto idx = atomic_counter.fetch_add(size_t{1}, cuda::std::memory_order_relaxed); using ref_type = typename cuco_store_type::ref_type; - cuda::atomic_ref ref((*iter).second); + cuda::atomic_ref ref( + (*iter).second); ref.store(idx, cuda::std::memory_order_relaxed); return idx; } else { @@ -148,7 +149,8 @@ struct kv_cuco_insert_if_and_increment_t { cuda::atomic_ref atomic_counter(*counter); auto idx = atomic_counter.fetch_add(size_t{1}, cuda::std::memory_order_relaxed); using ref_type = typename cuco_store_type::ref_type; - cuda::atomic_ref ref((*iter).second); + cuda::atomic_ref ref( + (*iter).second); ref.store(idx, cuda::std::memory_order_relaxed); return idx; } else { @@ -176,7 +178,8 @@ struct kv_cuco_insert_and_assign_t { auto [iter, inserted] = device_ref.insert_and_find(pair); if (!inserted) { using ref_type = typename cuco_store_type::ref_type; - cuda::atomic_ref ref((*iter).second); + cuda::atomic_ref ref( + (*iter).second); ref.store(thrust::get<1>(pair), cuda::std::memory_order_relaxed); } } diff --git a/cpp/src/prims/per_v_random_select_transform_outgoing_e.cuh b/cpp/src/prims/per_v_random_select_transform_outgoing_e.cuh index d7c094a2361..b238b964ede 100644 --- a/cpp/src/prims/per_v_random_select_transform_outgoing_e.cuh +++ b/cpp/src/prims/per_v_random_select_transform_outgoing_e.cuh @@ -810,8 +810,8 @@ per_v_random_select_transform_e(raft::handle_t const& handle, } auto edge_partition_frontier_local_degrees = edge_partition.compute_local_degrees( - raft::device_span(edge_partition_frontier_major_first, - local_frontier_sizes[i]), + edge_partition_frontier_major_first, + edge_partition_frontier_major_first + local_frontier_sizes[i], handle.get_stream()); if (minor_comm_size > 1) { diff --git a/cpp/src/prims/transform_reduce_v_frontier_outgoing_e_by_dst.cuh b/cpp/src/prims/transform_reduce_v_frontier_outgoing_e_by_dst.cuh index 7216eed1186..745f1a8fd8e 100644 --- a/cpp/src/prims/transform_reduce_v_frontier_outgoing_e_by_dst.cuh +++ b/cpp/src/prims/transform_reduce_v_frontier_outgoing_e_by_dst.cuh @@ -225,16 +225,14 @@ size_t compute_num_out_nbrs_from_frontier(raft::handle_t const& handle, static_cast(i), handle.get_stream()); - ret += edge_partition.compute_number_of_edges( - raft::device_span(edge_partition_frontier_vertices.begin(), - edge_partition_frontier_vertices.end()), - handle.get_stream()); + ret += edge_partition.compute_number_of_edges(edge_partition_frontier_vertices.begin(), + edge_partition_frontier_vertices.end(), + handle.get_stream()); } else { assert(i == 0); - ret += edge_partition.compute_number_of_edges( - raft::device_span(local_frontier_vertex_first, - local_frontier_vertex_first + frontier.size()), - handle.get_stream()); + ret += edge_partition.compute_number_of_edges(local_frontier_vertex_first, + local_frontier_vertex_first + frontier.size(), + handle.get_stream()); } } @@ -289,7 +287,8 @@ size_t compute_num_out_nbrs_from_frontier(raft::handle_t const& handle, * known member variables) to take a more optimized code path. See the documentation in the * reduce_op.cuh file for instructions on writing custom reduction operators. * @return Tuple of key values and payload values (if ReduceOp::value_type is not void) or just key - * values (if ReduceOp::value_type is void). + * values (if ReduceOp::value_type is void). Keys in the return values are sorted in ascending order + * using a vertex ID as the primary key and a tag (if relevant) as the secondary key. */ template get_stream()); - merged_vertices.shrink_to_fit(handle_ptr_->get_stream()); vertices_ = std::move(merged_vertices); } else { auto cur_size = vertices_.size(); @@ -204,8 +203,6 @@ class key_bucket_t { merged_pair_first + merged_vertices.size())), handle_ptr_->get_stream()); merged_tags.resize(merged_vertices.size(), handle_ptr_->get_stream()); - merged_vertices.shrink_to_fit(handle_ptr_->get_stream()); - merged_tags.shrink_to_fit(handle_ptr_->get_stream()); vertices_ = std::move(merged_vertices); tags_ = std::move(merged_tags); } else { @@ -283,6 +280,14 @@ class key_bucket_t { auto end() { return begin() + vertices_.size(); } + auto const vertex_begin() const { return vertices_.begin(); } + + auto const vertex_end() const { return vertices_.end(); } + + auto vertex_begin() { return vertices_.begin(); } + + auto vertex_end() { return vertices_.end(); } + private: raft::handle_t const* handle_ptr_{nullptr}; rmm::device_uvector vertices_; @@ -337,7 +342,10 @@ class vertex_frontier_t { // 1. apply split_op to each bucket element - assert(buckets_.size() <= std::numeric_limits::max()); + CUGRAPH_EXPECTS(buckets_.size() <= std::numeric_limits::max(), + "Invalid input arguments: the current implementation assumes that bucket " + "indices can be represented with uint8_t."); + rmm::device_uvector bucket_indices(this_bucket.size(), handle_ptr_->get_stream()); thrust::transform( handle_ptr_->get_thrust_policy(), @@ -349,28 +357,10 @@ class vertex_frontier_t { return static_cast(split_op_result ? *split_op_result : kInvalidBucketIdx); }); - // 2. remove elements with the invalid bucket indices + // 2. separte the elements to stay in this bucket from the elements to be moved to other buckets auto pair_first = thrust::make_zip_iterator(thrust::make_tuple(bucket_indices.begin(), this_bucket.begin())); - bucket_indices.resize( - thrust::distance(pair_first, - thrust::remove_if(handle_ptr_->get_thrust_policy(), - pair_first, - pair_first + bucket_indices.size(), - [] __device__(auto pair) { - return thrust::get<0>(pair) == - static_cast(kInvalidBucketIdx); - })), - handle_ptr_->get_stream()); - this_bucket.resize(bucket_indices.size()); - bucket_indices.shrink_to_fit(handle_ptr_->get_stream()); - this_bucket.shrink_to_fit(); - - // 3. separte the elements to stay in this bucket from the elements to be moved to other buckets - - pair_first = - thrust::make_zip_iterator(thrust::make_tuple(bucket_indices.begin(), this_bucket.begin())); auto new_this_bucket_size = static_cast(thrust::distance( pair_first, thrust::stable_partition( // stable_partition to maintain sorted order within each bucket @@ -381,6 +371,21 @@ class vertex_frontier_t { return thrust::get<0>(pair) == this_bucket_idx; }))); + // 3. remove elements with the invalid bucket indices + + bucket_indices.resize( + new_this_bucket_size + + thrust::distance(pair_first + new_this_bucket_size, + thrust::remove_if(handle_ptr_->get_thrust_policy(), + pair_first + new_this_bucket_size, + pair_first + bucket_indices.size(), + [] __device__(auto pair) { + return thrust::get<0>(pair) == + static_cast(kInvalidBucketIdx); + })), + handle_ptr_->get_stream()); + this_bucket.resize(bucket_indices.size()); + // 4. insert to target buckets and resize this bucket insert_to_buckets(bucket_indices.begin() + new_this_bucket_size, diff --git a/cpp/src/traversal/od_shortest_distances_impl.cuh b/cpp/src/traversal/od_shortest_distances_impl.cuh new file mode 100644 index 00000000000..09e41466393 --- /dev/null +++ b/cpp/src/traversal/od_shortest_distances_impl.cuh @@ -0,0 +1,1092 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +CUCO_DECLARE_BITWISE_COMPARABLE(float) +CUCO_DECLARE_BITWISE_COMPARABLE(double) + +namespace cugraph { + +namespace { + +template +struct aggregate_vi_t { + tag_t num_origins{}; + + __device__ key_t operator()(thrust::tuple tup) const + { + return (static_cast(thrust::get<0>(tup)) * static_cast(num_origins)) + + static_cast(thrust::get<1>(tup)); + } +}; + +template +struct split_vi_t { + tag_t num_origins{}; + + __device__ thrust::tuple operator()(key_t aggregated_vi) const + { + return thrust::make_tuple( + static_cast(aggregated_vi / static_cast(num_origins)), + static_cast(aggregated_vi % static_cast(num_origins))); + } +}; + +template +struct extract_v_t { + tag_t num_origins{}; + + __device__ vertex_t operator()(key_t aggregated_vi) const + { + return static_cast(aggregated_vi / static_cast(num_origins)); + } +}; + +template +struct update_v_to_destination_index_t { + raft::device_span destinations{}; + raft::device_span v_to_destination_indices{}; + + __device__ void operator()(od_idx_t i) const { v_to_destination_indices[destinations[i]] = i; } +}; + +template +struct compute_od_matrix_index_t { + raft::device_span v_to_destination_indices{}; + tag_t num_origins{}; + tag_t num_destinations{}; + + __device__ size_t operator()(key_t aggregated_vi) const + { + auto v = static_cast(aggregated_vi / static_cast(num_origins)); + auto tag = static_cast(aggregated_vi % static_cast(num_origins)); + return (tag * static_cast(num_destinations)) + + static_cast(v_to_destination_indices[v]); + } +}; + +template +struct check_destination_index_t { + raft::device_span v_to_destination_indices{}; + tag_t num_origins{}; + tag_t invalid_od_idx{}; + + __device__ bool operator()(key_t aggregated_vi) const + { + auto v = static_cast(aggregated_vi / static_cast(num_origins)); + return (v_to_destination_indices[v] != invalid_od_idx); + } +}; + +template +struct e_op_t { + detail::kv_cuco_store_find_device_view_t> + key_to_dist_map{}; + tag_t num_origins{}; + weight_t cutoff{}; + weight_t invalid_distance{}; + + __device__ thrust::optional> operator()( + thrust::tuple tagged_src, + vertex_t dst, + thrust::nullopt_t, + thrust::nullopt_t, + weight_t w) const + { + aggregate_vi_t aggregator{num_origins}; + + auto src_val = key_to_dist_map.find(aggregator(tagged_src)); + assert(src_val != invalid_distance); + auto origin_idx = thrust::get<1>(tagged_src); + auto new_distance = src_val + w; + auto threshold = cutoff; + auto dst_val = key_to_dist_map.find(aggregator(thrust::make_tuple(dst, origin_idx))); + if (dst_val != invalid_distance) { threshold = dst_val < threshold ? dst_val : threshold; } + return (new_distance < threshold) + ? thrust::optional>{thrust::make_tuple(origin_idx, + new_distance)} + : thrust::nullopt; + } +}; + +template +struct insert_nbr_key_t { + raft::device_span offsets{}; + raft::device_span indices{}; + detail::key_cuco_store_insert_device_view_t> key_set{}; + aggregate_vi_t aggregator{}; + + __device__ void operator()(thrust::tuple vi) + { + auto v = thrust::get<0>(vi); + auto idx = thrust::get<1>(vi); + + for (edge_t nbr_offset = offsets[v]; nbr_offset < offsets[v + 1]; ++nbr_offset) { + auto nbr = indices[nbr_offset]; + key_set.insert(aggregator(thrust::make_tuple(nbr, idx))); + } + } +}; + +template +struct keep_t { + weight_t old_near_far_threshold{}; + detail::key_cuco_store_contains_device_view_t> key_set{}; + + __device__ bool operator()(thrust::tuple pair) const + { + return (thrust::get<1>(pair) >= old_near_far_threshold) || + (key_set.contains(thrust::get<0>(pair))); + } +}; + +template +struct is_no_smaller_than_threshold_t { + weight_t threshold{}; + detail::kv_cuco_store_find_device_view_t> + key_to_dist_map{}; + + __device__ bool operator()(key_t key) const { return key_to_dist_map.find(key) >= threshold; } +}; + +size_t compute_kv_store_capacity(size_t new_min_size, + size_t old_capacity, + size_t max_capacity_increment) +{ + if (new_min_size <= old_capacity) { + return old_capacity; // shrinking the kv_store has little impact in reducing the peak memory + // usage (it may have limited benefit in improving cache hit ratio at the + // cost of more collisions) + } else { + return old_capacity + raft::round_up_safe(new_min_size - old_capacity, max_capacity_increment); + } +} + +int32_t constexpr multi_partition_copy_block_size = 512; // tuning parameter + +template +__global__ void multi_partition_copy( + InputIterator input_first, + InputIterator input_last, + raft::device_span output_buffer_ptrs, + PartitionOp partition_op, // returns max_num_partitions to discard + raft::device_span partition_counters) +{ + static_assert(max_num_partitions <= static_cast(std::numeric_limits::max())); + assert(output_buffer_ptrs.size() == partition_counters.size()); + int32_t num_partitions = output_buffer_ptrs.size(); + assert(num_partitions <= max_num_partitions); + + auto const tid = threadIdx.x + blockIdx.x * blockDim.x; + auto idx = static_cast(tid); + + int32_t constexpr tmp_buffer_size = + max_num_partitions; // tuning parameter (trade-off between parallelism & memory vs # BlockScan + // & atomic operations) + static_assert( + static_cast(multi_partition_copy_block_size) * static_cast(tmp_buffer_size) <= + static_cast( + std::numeric_limits::max())); // int32_t is sufficient to store the maximum possible + // number of updates per block + + using BlockScan = cub::BlockScan; + __shared__ typename BlockScan::TempStorage temp_storage; + + __shared__ size_t block_start_offsets[max_num_partitions]; + + static_assert(tmp_buffer_size <= static_cast(std::numeric_limits::max())); + uint8_t tmp_counts[max_num_partitions]; + int32_t tmp_intra_block_offsets[max_num_partitions]; + + uint8_t tmp_partitions[tmp_buffer_size]; + uint8_t tmp_offsets[tmp_buffer_size]; + + auto num_elems = static_cast(thrust::distance(input_first, input_last)); + auto rounded_up_num_elems = + ((num_elems + static_cast(blockDim.x - 1)) / static_cast(blockDim.x)) * + static_cast(blockDim.x); + while (idx < rounded_up_num_elems) { + thrust::fill(thrust::seq, tmp_counts, tmp_counts + num_partitions, int32_t{0}); + auto tmp_idx = idx; + for (int32_t i = 0; i < tmp_buffer_size; ++i) { + if (tmp_idx < num_elems) { + auto partition = partition_op(*(input_first + tmp_idx)); + tmp_partitions[i] = partition; + tmp_offsets[i] = tmp_counts[partition]; + ++tmp_counts[partition]; + } + tmp_idx += gridDim.x * blockDim.x; + } + for (int32_t i = 0; i < num_partitions; ++i) { + BlockScan(temp_storage) + .ExclusiveSum(static_cast(tmp_counts[i]), tmp_intra_block_offsets[i]); + } + if (threadIdx.x == (blockDim.x - 1)) { + for (int32_t i = 0; i < num_partitions; ++i) { + auto increment = static_cast(tmp_intra_block_offsets[i] + tmp_counts[i]); + cuda::atomic_ref atomic_counter(partition_counters[i]); + block_start_offsets[i] = + atomic_counter.fetch_add(increment, cuda::std::memory_order_relaxed); + } + } + __syncthreads(); + tmp_idx = idx; + for (int32_t i = 0; i < tmp_buffer_size; ++i) { + if (tmp_idx < num_elems) { + auto partition = tmp_partitions[i]; + if (partition != static_cast(max_num_partitions)) { + auto offset = block_start_offsets[partition] + + static_cast(tmp_intra_block_offsets[partition] + tmp_offsets[i]); + *(output_buffer_ptrs[partition] + offset) = thrust::get<0>(*(input_first + tmp_idx)); + } + } + tmp_idx += gridDim.x * blockDim.x; + } + + idx += static_cast(gridDim.x * blockDim.x) * tmp_buffer_size; + } +} + +template +kv_store_t filter_key_to_dist_map( + raft::handle_t const& handle, + GraphViewType const& graph_view, + kv_store_t&& key_to_dist_map, + key_bucket_t const& near_bucket, + std::vector> const& far_buffers, + weight_t old_near_far_threshold, + size_t min_extra_capacity, // ensure at least extra_capacity elements can be inserted + size_t kv_store_capacity_increment, + size_t num_origins) +{ + using vertex_t = typename GraphViewType::vertex_type; + using edge_t = typename GraphViewType::edge_type; + + auto invalid_key = key_to_dist_map.invalid_key(); + auto invalid_distance = key_to_dist_map.invalid_value(); + + auto old_kv_store_capacity = key_to_dist_map.capacity(); + + rmm::device_uvector old_key_buffer(0, handle.get_stream()); + rmm::device_uvector old_value_buffer(0, handle.get_stream()); + rmm::device_uvector keep_flags(0, handle.get_stream()); + size_t keep_count{0}; + { + std::tie(old_key_buffer, old_value_buffer) = key_to_dist_map.release(handle.get_stream()); + keep_flags.resize(old_key_buffer.size(), handle.get_stream()); + + // FIXME: better use a higher-level interface than this. + auto edge_partition = + edge_partition_device_view_t( + graph_view.local_edge_partition_view(0)); + auto num_edges = edge_partition.compute_number_of_edges( + near_bucket.vertex_begin(), near_bucket.vertex_end(), handle.get_stream()); + for (size_t i = 0; i < far_buffers.size(); ++i) { + auto far_vertex_first = thrust::make_transform_iterator( + far_buffers[i].begin(), + extract_v_t{static_cast(num_origins)}); + num_edges += edge_partition.compute_number_of_edges( + far_vertex_first, far_vertex_first + far_buffers[i].size(), handle.get_stream()); + } + + auto key_set = key_store_t( + num_edges, invalid_key, handle.get_stream()); + + // FIXME: better implement this using a primitive + auto offsets = graph_view.local_edge_partition_view().offsets(); + auto indices = graph_view.local_edge_partition_view().indices(); + + thrust::for_each(handle.get_thrust_policy(), + near_bucket.begin(), + near_bucket.end(), + insert_nbr_key_t{ + offsets, + indices, + detail::key_cuco_store_insert_device_view_t(key_set.view()), + aggregate_vi_t{static_cast(num_origins)}}); + + for (size_t i = 0; i < far_buffers.size(); ++i) { + auto far_vi_first = thrust::make_transform_iterator( + far_buffers[i].begin(), + split_vi_t{static_cast(num_origins)}); + thrust::for_each(handle.get_thrust_policy(), + far_vi_first, + far_vi_first + far_buffers[i].size(), + insert_nbr_key_t{ + offsets, + indices, + detail::key_cuco_store_insert_device_view_t(key_set.view()), + aggregate_vi_t{static_cast(num_origins)}}); + } + + auto old_kv_pair_first = + thrust::make_zip_iterator(old_key_buffer.begin(), old_value_buffer.begin()); + auto old_kv_pair_last = thrust::transform( + handle.get_thrust_policy(), + old_kv_pair_first, + old_kv_pair_first + old_key_buffer.size(), + keep_flags.begin(), + keep_t{old_near_far_threshold, + detail::key_cuco_store_contains_device_view_t(key_set.view())}); + + keep_count = thrust::count_if( + handle.get_thrust_policy(), keep_flags.begin(), keep_flags.end(), thrust::identity{}); + } + + size_t new_kv_store_capacity = compute_kv_store_capacity( + keep_count + min_extra_capacity, old_kv_store_capacity, kv_store_capacity_increment); + + key_to_dist_map = kv_store_t( + new_kv_store_capacity, invalid_key, invalid_distance, handle.get_stream()); + + key_to_dist_map.insert_if(old_key_buffer.begin(), + old_key_buffer.end(), + old_value_buffer.begin(), + keep_flags.begin(), + thrust::identity{}, + handle.get_stream()); + + return std::move(key_to_dist_map); +} + +template +rmm::device_uvector od_shortest_distances( + raft::handle_t const& handle, + GraphViewType const& graph_view, + edge_property_view_t edge_weight_view, + raft::device_span origins, + raft::device_span destinations, + weight_t cutoff, + weight_t delta, + bool do_expensive_check) +{ + using vertex_t = typename GraphViewType::vertex_type; + using edge_t = typename GraphViewType::edge_type; + using key_t = uint64_t; + using od_idx_t = uint32_t; // origin/destination idx + + static_assert(std::is_integral::value, + "GraphViewType::vertex_type should be integral."); + static_assert(!GraphViewType::is_storage_transposed, + "GraphViewType should support the push model."); + static_assert(!GraphViewType::is_multi_gpu, "We currently do not support multi-GPU."); + + // concurrently runs multiple instances of the Near-Far Pile method in + // A. Davidson, S. Baxter, M. Garland, and J. D. Owens, "Work-efficient parallel GPU methods for + // single-source shortest paths," 2014. + + // 1. check input arguments + + auto const num_vertices = graph_view.number_of_vertices(); + auto const num_edges = graph_view.number_of_edges(); + + CUGRAPH_EXPECTS(num_vertices != 0 || (origins.size() == 0 && destinations.size() == 0), + "Invalid input argument: the input graph is empty but origins.size() > 0 or " + "destinations.size() > 0."); + + CUGRAPH_EXPECTS( + static_cast(num_vertices) * origins.size() <= std::numeric_limits::max(), + "Invalid input arguments: graph_view.number_of_vertices() * origins.size() too large, the " + "current implementation assumes that a vertex ID and a origin index can be packed in a 64 " + "bit value."); + + CUGRAPH_EXPECTS(origins.size() <= std::numeric_limits::max() && + destinations.size() <= std::numeric_limits::max(), + "Invalid input arguments: origins.size() or destinations.size() too large, the " + "current implementation assumes that the origin/destination index can be " + "represented using a 32 bit value."); + + if (do_expensive_check) { + auto num_negative_edge_weights = + count_if_e(handle, + graph_view, + edge_src_dummy_property_t{}.view(), + edge_dst_dummy_property_t{}.view(), + edge_weight_view, + [] __device__(vertex_t, vertex_t, auto, auto, weight_t w) { return w < 0.0; }); + CUGRAPH_EXPECTS(num_negative_edge_weights == 0, + "Invalid input argument: input edge weights should have non-negative values."); + + auto num_invalid_origins = thrust::count_if( + handle.get_thrust_policy(), + origins.begin(), + origins.end(), + [num_vertices] __device__(auto v) { return !is_valid_vertex(num_vertices, v); }); + auto num_invalid_destinations = thrust::count_if( + handle.get_thrust_policy(), + destinations.begin(), + destinations.end(), + [num_vertices] __device__(auto v) { return !is_valid_vertex(num_vertices, v); }); + CUGRAPH_EXPECTS(num_invalid_origins == 0, + "Invalid input arguments: origins contains invalid vertex IDs."); + CUGRAPH_EXPECTS(num_invalid_destinations == 0, + "Invalid input arguments: destinations contains invalid vertex IDs."); + + rmm::device_uvector tmp_origins(origins.size(), handle.get_stream()); + thrust::copy(handle.get_thrust_policy(), origins.begin(), origins.end(), tmp_origins.begin()); + thrust::sort(handle.get_thrust_policy(), tmp_origins.begin(), tmp_origins.end()); + CUGRAPH_EXPECTS( + thrust::unique(handle.get_thrust_policy(), tmp_origins.begin(), tmp_origins.end()) == + tmp_origins.end(), + "Invalid input arguments: origins should not have duplicates."); + + rmm::device_uvector tmp_destinations(destinations.size(), handle.get_stream()); + thrust::copy(handle.get_thrust_policy(), + destinations.begin(), + destinations.end(), + tmp_destinations.begin()); + thrust::sort(handle.get_thrust_policy(), tmp_destinations.begin(), tmp_destinations.end()); + CUGRAPH_EXPECTS(thrust::unique(handle.get_thrust_policy(), + tmp_destinations.begin(), + tmp_destinations.end()) == tmp_destinations.end(), + "Invalid input arguments: destinations should not have duplicates."); + } + + // 2. set performance tuning parameters + + size_t constexpr num_far_buffers{5}; + + auto total_global_mem = handle.get_device_properties().totalGlobalMem; + + size_t key_buffer_capacity_increment = origins.size() * size_t{1024}; + size_t init_far_buffer_size = origins.size() * size_t{1024}; + + size_t kv_store_capacity_increment = raft::round_up_safe( + std::max(static_cast((static_cast(total_global_mem) * 0.01) / + static_cast(sizeof(key_t) + sizeof(weight_t))), + size_t{1}), + size_t{1} << 12); + auto init_kv_store_size = + std::min(static_cast((static_cast(num_vertices) * 0.001) * + static_cast(origins.size())), + kv_store_capacity_increment * 25); + init_kv_store_size = std::max(init_kv_store_size, origins.size()); + + auto target_near_q_size = + static_cast(handle.get_device_properties().multiProcessorCount) * + size_t{32 * 1024}; // increase the step size if the near queue size is smaller than the target + // size (up to num_far_buffers * delta) + + // 3. initialize od_matrix & v_to_destination_indices + + auto constexpr invalid_distance = + std::numeric_limits::lowest(); // no valid distance can be negative + auto constexpr invalid_vertex = invalid_vertex_id::value; + auto constexpr invalid_od_idx = std::numeric_limits::max(); + auto constexpr invalid_key = std::numeric_limits::max(); + + auto od_matrix_size = origins.size() * destinations.size(); + rmm::device_uvector od_matrix(od_matrix_size, handle.get_stream()); + thrust::fill(handle.get_thrust_policy(), + od_matrix.begin(), + od_matrix.end(), + std::numeric_limits::max()); + + if (num_vertices == 0 || num_edges == 0 || od_matrix.size() == 0) { return od_matrix; } + + rmm::device_uvector v_to_destination_indices(num_vertices, handle.get_stream()); + thrust::fill(handle.get_thrust_policy(), + v_to_destination_indices.begin(), + v_to_destination_indices.end(), + invalid_od_idx); + thrust::for_each(handle.get_thrust_policy(), + thrust::make_counting_iterator(od_idx_t{0}), + thrust::make_counting_iterator(static_cast(destinations.size())), + update_v_to_destination_index_t{ + destinations, + raft::device_span(v_to_destination_indices.data(), + v_to_destination_indices.size())}); + + // 4. initialize SSSP frontier + + constexpr size_t bucket_idx_near = 0; + constexpr size_t num_buckets = 1; + + vertex_frontier_t + vertex_frontier(handle, num_buckets); + + std::vector> far_buffers{}; + far_buffers.reserve(num_far_buffers); + for (size_t i = 0; i < num_far_buffers; ++i) { + rmm::device_uvector buffer(0, handle.get_stream()); + buffer.reserve(init_far_buffer_size, handle.get_stream()); + far_buffers.push_back(std::move(buffer)); + } + + auto init_kv_store_capacity = + compute_kv_store_capacity(init_kv_store_size, size_t{0}, kv_store_capacity_increment); + + kv_store_t key_to_dist_map( + init_kv_store_capacity, invalid_key, invalid_distance, handle.get_stream()); + + // 5. initialize vertex_frontier & key_to_dist_map, and update od_matrix + + { + auto tagged_origin_first = + thrust::make_zip_iterator(origins.begin(), thrust::make_counting_iterator(od_idx_t{0})); + vertex_frontier.bucket(bucket_idx_near) + .insert(tagged_origin_first, tagged_origin_first + origins.size()); + + auto key_first = thrust::make_transform_iterator( + tagged_origin_first, + aggregate_vi_t{static_cast(origins.size())}); + key_to_dist_map.insert(key_first, + key_first + origins.size(), + thrust::make_constant_iterator(weight_t{0.0}), + handle.get_stream()); + + thrust::transform_if(handle.get_thrust_policy(), + thrust::make_constant_iterator(weight_t{0.0}), + thrust::make_constant_iterator(weight_t{0.0}) + origins.size(), + key_first, + thrust::make_permutation_iterator( + od_matrix.begin(), + thrust::make_transform_iterator( + key_first, + compute_od_matrix_index_t{ + raft::device_span(v_to_destination_indices.data(), + v_to_destination_indices.size()), + static_cast(origins.size()), + static_cast(destinations.size())})), + thrust::identity{}, + check_destination_index_t{ + raft::device_span(v_to_destination_indices.data(), + v_to_destination_indices.size()), + static_cast(origins.size()), + invalid_od_idx}); + } + + // 6. SSSP iteration + + auto old_near_far_threshold = weight_t{0.0}; + auto near_far_threshold = delta; + std::vector next_far_thresholds(num_far_buffers - 1); + for (size_t i = 0; i < next_far_thresholds.size(); ++i) { + next_far_thresholds[i] = + (i == 0) ? (near_far_threshold + delta) : next_far_thresholds[i - 1] + delta; + } + size_t prev_near_far_threshold_num_inserts{0}; + size_t prev_num_near_q_insert_buffers{1}; + while (true) { + // 6-1. enumerate next frontier candidates + + rmm::device_uvector new_frontier_keys(0, handle.get_stream()); + rmm::device_uvector distance_buffer(0, handle.get_stream()); + { + // use detail space functions as sort_by_key with key = key_t is faster than key = + // thrust::tuple and we need to convert thrust::tuple + // to key_t anyways for post processing + + auto e_op = e_op_t{ + detail::kv_cuco_store_find_device_view_t(key_to_dist_map.view()), + static_cast(origins.size()), + cutoff, + invalid_distance}; + detail::call_e_op_t, + weight_t, + vertex_t, + thrust::nullopt_t, + thrust::nullopt_t, + weight_t, + e_op_t> + e_op_wrapper{e_op}; + + auto new_frontier_tagged_vertex_buffer = + allocate_dataframe_buffer>(0, handle.get_stream()); + std::tie(new_frontier_tagged_vertex_buffer, distance_buffer) = + detail::extract_transform_v_frontier_e, weight_t>( + handle, + graph_view, + vertex_frontier.bucket(bucket_idx_near), + edge_src_dummy_property_t{}.view(), + edge_dst_dummy_property_t{}.view(), + edge_weight_view, + e_op_wrapper); + + new_frontier_keys.resize(size_dataframe_buffer(new_frontier_tagged_vertex_buffer), + handle.get_stream()); + auto key_first = thrust::make_transform_iterator( + get_dataframe_buffer_begin(new_frontier_tagged_vertex_buffer), + aggregate_vi_t{static_cast(origins.size())}); + thrust::copy(handle.get_thrust_policy(), + key_first, + key_first + size_dataframe_buffer(new_frontier_tagged_vertex_buffer), + new_frontier_keys.begin()); + resize_dataframe_buffer(new_frontier_tagged_vertex_buffer, 0, handle.get_stream()); + shrink_to_fit_dataframe_buffer(new_frontier_tagged_vertex_buffer, handle.get_stream()); + + std::tie(new_frontier_keys, distance_buffer) = + detail::sort_and_reduce_buffer_elements>( + handle, + std::move(new_frontier_keys), + std::move(distance_buffer), + reduce_op::minimum()); + } + vertex_frontier.bucket(bucket_idx_near).clear(); + + // 6-2. update key_to_dist_map + + { + auto new_min_capacity = key_to_dist_map.size() + new_frontier_keys.size(); + if (key_to_dist_map.capacity() < + new_min_capacity) { // note that this is conservative as some keys may already exist in + // key_to_dist_map + auto new_kv_store_capacity = compute_kv_store_capacity( + new_min_capacity, key_to_dist_map.capacity(), kv_store_capacity_increment); + auto [old_key_buffer, old_value_buffer] = key_to_dist_map.release(handle.get_stream()); + key_to_dist_map = kv_store_t( + new_kv_store_capacity, invalid_key, invalid_distance, handle.get_stream()); + key_to_dist_map.insert(get_dataframe_buffer_begin(old_key_buffer), + get_dataframe_buffer_end(old_key_buffer), + old_value_buffer.begin(), + handle.get_stream()); + } + key_to_dist_map.insert_and_assign(new_frontier_keys.begin(), + new_frontier_keys.end(), + distance_buffer.begin(), + handle.get_stream()); + prev_near_far_threshold_num_inserts += new_frontier_keys.size(); + } + + // 6-3. update od_matrix + + { + thrust::transform_if(handle.get_thrust_policy(), + distance_buffer.begin(), + distance_buffer.end(), + new_frontier_keys.begin(), + thrust::make_permutation_iterator( + od_matrix.begin(), + thrust::make_transform_iterator( + new_frontier_keys.begin(), + compute_od_matrix_index_t{ + raft::device_span(v_to_destination_indices.data(), + v_to_destination_indices.size()), + static_cast(origins.size()), + static_cast(destinations.size())})), + thrust::identity{}, + check_destination_index_t{ + raft::device_span(v_to_destination_indices.data(), + v_to_destination_indices.size()), + static_cast(origins.size()), + invalid_od_idx}); + } + + // 6-4. update the queues + + { + std::vector h_split_thresholds( + num_far_buffers); // total # buffers = 1 (near queue) + num_far_buffers + h_split_thresholds[0] = near_far_threshold; + for (size_t i = 1; i < h_split_thresholds.size(); ++i) { + h_split_thresholds[i] = next_far_thresholds[i - 1]; + } + rmm::device_uvector d_split_thresholds(h_split_thresholds.size(), + handle.get_stream()); + raft::update_device(d_split_thresholds.data(), + h_split_thresholds.data(), + h_split_thresholds.size(), + handle.get_stream()); + + rmm::device_uvector d_counters(d_split_thresholds.size() + 1, handle.get_stream()); + thrust::fill(handle.get_thrust_policy(), d_counters.begin(), d_counters.end(), size_t{0}); + + auto num_tagged_vertices = new_frontier_keys.size(); + rmm::device_uvector tmp_near_q_keys(0, handle.get_stream()); + tmp_near_q_keys.reserve(num_tagged_vertices, handle.get_stream()); + + std::vector old_far_buffer_sizes(num_far_buffers); + for (size_t i = 0; i < num_far_buffers; ++i) { + old_far_buffer_sizes[i] = far_buffers[i].size(); + } + + auto input_first = + thrust::make_zip_iterator(new_frontier_keys.begin(), distance_buffer.begin()); + + size_t num_copied{0}; + while (num_copied < num_tagged_vertices) { + auto this_loop_size = + std::min(key_buffer_capacity_increment, num_tagged_vertices - num_copied); + + std::vector h_buffer_ptrs(d_counters.size()); + tmp_near_q_keys.resize(tmp_near_q_keys.size() + this_loop_size, handle.get_stream()); + h_buffer_ptrs[0] = tmp_near_q_keys.data(); + for (size_t i = 0; i < num_far_buffers; ++i) { + if (far_buffers[i].size() + this_loop_size > far_buffers[i].capacity()) { + far_buffers[i].reserve( + far_buffers[i].capacity() + + raft::round_up_safe(this_loop_size, key_buffer_capacity_increment), + handle.get_stream()); + } + far_buffers[i].resize(far_buffers[i].size() + this_loop_size, handle.get_stream()); + h_buffer_ptrs[i + 1] = far_buffers[i].data() + old_far_buffer_sizes[i]; + } + rmm::device_uvector d_buffer_ptrs(h_buffer_ptrs.size(), handle.get_stream()); + raft::update_device( + d_buffer_ptrs.data(), h_buffer_ptrs.data(), h_buffer_ptrs.size(), handle.get_stream()); + + raft::grid_1d_thread_t update_grid(this_loop_size, + multi_partition_copy_block_size, + handle.get_device_properties().maxGridSize[0]); + multi_partition_copy(1 /* near queue */ + num_far_buffers)> + <<>>( + input_first + num_copied, + input_first + num_copied + this_loop_size, + raft::device_span(d_buffer_ptrs.data(), d_buffer_ptrs.size()), + [split_thresholds = raft::device_span( + d_split_thresholds.data(), d_split_thresholds.size())] __device__(auto pair) { + return static_cast( + thrust::distance(split_thresholds.begin(), + thrust::upper_bound(thrust::seq, + split_thresholds.begin(), + split_thresholds.end(), + thrust::get<1>(pair)))); + }, + raft::device_span(d_counters.data(), d_counters.size())); + + std::vector h_counters(d_counters.size()); + raft::update_host( + h_counters.data(), d_counters.data(), d_counters.size(), handle.get_stream()); + handle.sync_stream(); + + tmp_near_q_keys.resize(h_counters[0], handle.get_stream()); + for (size_t i = 0; i < num_far_buffers; ++i) { + far_buffers[i].resize(old_far_buffer_sizes[i] + h_counters[i + 1], handle.get_stream()); + } + + num_copied += this_loop_size; + } + + thrust::sort(handle.get_thrust_policy(), tmp_near_q_keys.begin(), tmp_near_q_keys.end()); + tmp_near_q_keys.resize(thrust::distance(tmp_near_q_keys.begin(), + thrust::unique(handle.get_thrust_policy(), + tmp_near_q_keys.begin(), + tmp_near_q_keys.end())), + handle.get_stream()); + auto near_vi_first = thrust::make_transform_iterator( + tmp_near_q_keys.begin(), + split_vi_t{static_cast(origins.size())}); + vertex_frontier.bucket(bucket_idx_near) + .insert(near_vi_first, near_vi_first + tmp_near_q_keys.size()); + } + + new_frontier_keys.resize(0, handle.get_stream()); + distance_buffer.resize(0, handle.get_stream()); + new_frontier_keys.shrink_to_fit(handle.get_stream()); + distance_buffer.shrink_to_fit(handle.get_stream()); + + if (vertex_frontier.bucket(bucket_idx_near).aggregate_size() > 0) { + /* nothing to do */ + } else { + auto num_aggregate_far_keys = far_buffers[0].size(); + for (size_t i = 1; i < num_far_buffers; ++i) { + num_aggregate_far_keys += far_buffers[i].size(); + } + + if (num_aggregate_far_keys > 0) { // near queue is empty, split the far queue + std::vector invalid_thresholds(num_far_buffers); + for (size_t i = 0; i < invalid_thresholds.size(); ++i) { + invalid_thresholds[i] = (i == 0) ? near_far_threshold : next_far_thresholds[i - 1]; + } + + size_t num_near_q_insert_buffers{0}; + size_t near_size{0}; + size_t tot_far_size{0}; + do { + num_near_q_insert_buffers = 0; + size_t max_near_q_inserts{0}; + old_near_far_threshold = near_far_threshold; + do { + near_far_threshold = + (num_far_buffers > 1) ? next_far_thresholds[0] : (near_far_threshold + delta); + for (size_t i = 0; i < next_far_thresholds.size(); ++i) { + next_far_thresholds[i] = (i < next_far_thresholds.size() - 1) + ? next_far_thresholds[i + 1] + : (next_far_thresholds[i] + delta); + } + max_near_q_inserts += far_buffers[num_near_q_insert_buffers].size(); + ++num_near_q_insert_buffers; + } while ((max_near_q_inserts < target_near_q_size) && + (num_near_q_insert_buffers < num_far_buffers)); + + rmm::device_uvector new_near_q_keys(0, handle.get_stream()); + new_near_q_keys.reserve(max_near_q_inserts, handle.get_stream()); + + for (size_t i = 0; i < num_far_buffers; ++i) { + auto invalid_threshold = invalid_thresholds[i]; + + if (i == (num_far_buffers - 1)) { + std::vector h_split_thresholds(num_near_q_insert_buffers); + h_split_thresholds[0] = + (num_far_buffers == num_near_q_insert_buffers) + ? near_far_threshold + : next_far_thresholds[(num_far_buffers - num_near_q_insert_buffers) - 1]; + for (size_t j = 1; j < h_split_thresholds.size(); ++j) { + h_split_thresholds[j] = + next_far_thresholds[(num_far_buffers - num_near_q_insert_buffers) + (j - 1)]; + } + rmm::device_uvector d_split_thresholds(h_split_thresholds.size(), + handle.get_stream()); + raft::update_device(d_split_thresholds.data(), + h_split_thresholds.data(), + h_split_thresholds.size(), + handle.get_stream()); + + rmm::device_uvector tmp_buffer = std::move(far_buffers.back()); + std::vector h_buffer_ptrs(h_split_thresholds.size() + 1); + auto old_size = new_near_q_keys.size(); + for (size_t j = 0; j < h_buffer_ptrs.size(); ++j) { + if (j == 0 && (num_far_buffers == num_near_q_insert_buffers)) { + new_near_q_keys.resize(old_size + tmp_buffer.size(), handle.get_stream()); + h_buffer_ptrs[j] = new_near_q_keys.data() + old_size; + } else { + auto buffer_idx = (num_far_buffers - num_near_q_insert_buffers) + (j - 1); + far_buffers[buffer_idx].reserve( + raft::round_up_safe(std::max(tmp_buffer.size(), size_t{1}), + key_buffer_capacity_increment), + handle.get_stream()); + far_buffers[buffer_idx].resize(tmp_buffer.size(), handle.get_stream()); + h_buffer_ptrs[j] = far_buffers[buffer_idx].data(); + } + } + rmm::device_uvector d_buffer_ptrs(num_near_q_insert_buffers + 1, + handle.get_stream()); + raft::update_device(d_buffer_ptrs.data(), + h_buffer_ptrs.data(), + h_buffer_ptrs.size(), + handle.get_stream()); + rmm::device_uvector d_counters(num_near_q_insert_buffers + 1, + handle.get_stream()); + thrust::fill( + handle.get_thrust_policy(), d_counters.begin(), d_counters.end(), size_t{0}); + if (tmp_buffer.size() > 0) { + auto distance_first = thrust::make_transform_iterator( + tmp_buffer.begin(), + [key_to_dist_map = detail::kv_cuco_store_find_device_view_t( + key_to_dist_map.view())] __device__(auto key) { + return key_to_dist_map.find(key); + }); + auto input_first = thrust::make_zip_iterator(tmp_buffer.begin(), distance_first); + raft::grid_1d_thread_t update_grid(tmp_buffer.size(), + multi_partition_copy_block_size, + handle.get_device_properties().maxGridSize[0]); + auto constexpr max_num_partitions = + static_cast(1 /* near queue */ + num_far_buffers); + multi_partition_copy + <<>>( + input_first, + input_first + tmp_buffer.size(), + raft::device_span(d_buffer_ptrs.data(), d_buffer_ptrs.size()), + [split_thresholds = raft::device_span( + d_split_thresholds.data(), d_split_thresholds.size()), + invalid_threshold] __device__(auto pair) { + auto dist = thrust::get<1>(pair); + return static_cast( + (dist < invalid_threshold) + ? max_num_partitions /* discard */ + : thrust::distance(split_thresholds.begin(), + thrust::upper_bound(thrust::seq, + split_thresholds.begin(), + split_thresholds.end(), + dist))); + }, + raft::device_span(d_counters.data(), d_counters.size())); + } + std::vector h_counters(d_counters.size()); + raft::update_host( + h_counters.data(), d_counters.data(), d_counters.size(), handle.get_stream()); + handle.sync_stream(); + for (size_t j = 0; j < (num_near_q_insert_buffers + 1); ++j) { + if (j == 0 && (num_far_buffers == num_near_q_insert_buffers)) { + new_near_q_keys.resize(old_size + h_counters[j], handle.get_stream()); + } else { + auto buffer_idx = (num_far_buffers - num_near_q_insert_buffers) + (j - 1); + far_buffers[buffer_idx].resize(h_counters[j], handle.get_stream()); + } + } + } else if (i < num_near_q_insert_buffers) { + auto old_size = new_near_q_keys.size(); + new_near_q_keys.resize(old_size + far_buffers[i].size(), handle.get_stream()); + auto last = thrust::copy_if( + handle.get_thrust_policy(), + far_buffers[i].begin(), + far_buffers[i].end(), + new_near_q_keys.begin() + old_size, + is_no_smaller_than_threshold_t{ + invalid_threshold, + detail::kv_cuco_store_find_device_view_t(key_to_dist_map.view())}); + new_near_q_keys.resize(thrust::distance(new_near_q_keys.begin(), last), + handle.get_stream()); + } else { + far_buffers[i - num_near_q_insert_buffers] = std::move(far_buffers[i]); + } + } + + thrust::sort(handle.get_thrust_policy(), new_near_q_keys.begin(), new_near_q_keys.end()); + new_near_q_keys.resize(thrust::distance(new_near_q_keys.begin(), + thrust::unique(handle.get_thrust_policy(), + new_near_q_keys.begin(), + new_near_q_keys.end())), + handle.get_stream()); + auto near_vi_first = thrust::make_transform_iterator( + new_near_q_keys.begin(), + split_vi_t{static_cast(origins.size())}); + vertex_frontier.bucket(bucket_idx_near) + .insert(near_vi_first, near_vi_first + new_near_q_keys.size()); + + near_size = vertex_frontier.bucket(bucket_idx_near).size(); + tot_far_size = far_buffers[0].size(); + for (size_t i = 1; i < num_far_buffers; ++i) { + tot_far_size += far_buffers[i].size(); + } + } while ((near_size == 0) && (tot_far_size > 0)); + + // this assumes that # inserts with the previous near_far_threshold is a good estimate for + // # inserts with the next near_far_threshold + auto next_near_far_threshold_num_inserts_estimate = + static_cast(static_cast(prev_near_far_threshold_num_inserts) * + std::max(static_cast(num_near_q_insert_buffers) / + static_cast(prev_num_near_q_insert_buffers), + 1.0) * + 1.2); + prev_near_far_threshold_num_inserts = 0; + prev_num_near_q_insert_buffers = num_near_q_insert_buffers; + + if (key_to_dist_map.size() + next_near_far_threshold_num_inserts_estimate >= + key_to_dist_map.capacity()) { // if resize is likely to be necessary before reaching + // this check again + std::vector> far_buffer_spans(num_far_buffers); + for (size_t i = 0; i < num_far_buffers; ++i) { + far_buffer_spans[i] = + raft::device_span(far_buffers[i].data(), far_buffers[i].size()); + } + key_to_dist_map = filter_key_to_dist_map(handle, + graph_view, + std::move(key_to_dist_map), + vertex_frontier.bucket(bucket_idx_near), + far_buffer_spans, + old_near_far_threshold, + next_near_far_threshold_num_inserts_estimate, + kv_store_capacity_increment, + origins.size()); + } + if ((near_size == 0) && (tot_far_size == 0)) { break; } + } else { + break; + } + } + } + + return od_matrix; +} + +} // namespace + +template +rmm::device_uvector od_shortest_distances( + raft::handle_t const& handle, + graph_view_t const& graph_view, + edge_property_view_t edge_weight_view, + raft::device_span origins, + raft::device_span destinations, + weight_t cutoff, + bool do_expensive_check) +{ + CUGRAPH_EXPECTS(!graph_view.has_edge_mask(), "unimplemented."); + + auto const num_vertices = graph_view.number_of_vertices(); + auto const num_edges = graph_view.number_of_edges(); + + weight_t average_vertex_degree = + static_cast(num_edges) / static_cast(num_vertices); + auto average_edge_weight = transform_reduce_e( + handle, + graph_view, + edge_src_dummy_property_t{}.view(), + edge_dst_dummy_property_t{}.view(), + edge_weight_view, + [] __device__(vertex_t, vertex_t, auto, auto, weight_t w) { return w; }, + weight_t{0.0}); + { + // the above transform_reduce_e output can vary in each run due to floating point round-off + // errro, we reduce the precision of the significand here to reduce the non-determinicity due to + // the difference in delta in each run (this still does not guarantee that we will get the same + // delta in every run for every graph) + assert(average_edge_weight >= 0.0); + int exponent{}; + auto significand = + frexp(average_edge_weight, + &exponent); // average_edge_weight = significnad * 2^exponent, 0.5 <= significand < 1.0 + significand = round(significand * 1000.0) / 1000.0; + average_edge_weight = ldexp(significand, exponent); + } + average_edge_weight /= static_cast(num_edges); + // FIXME: better use min_edge_weight instead of std::numeric_limits::min() * 1e3 for + // forward progress guarantee transform_reduce_e should better be updated to support min + // reduction. + auto delta = std::max(average_edge_weight * 0.5, std::numeric_limits::min() * 1e3); + + return od_shortest_distances, weight_t>( + handle, + graph_view, + edge_weight_view, + raft::device_span(origins.data(), origins.size()), + raft::device_span(destinations.data(), destinations.size()), + cutoff, + delta, + do_expensive_check); +} + +} // namespace cugraph diff --git a/cpp/src/traversal/od_shortest_distances_sg.cu b/cpp/src/traversal/od_shortest_distances_sg.cu new file mode 100644 index 00000000000..43c6f516374 --- /dev/null +++ b/cpp/src/traversal/od_shortest_distances_sg.cu @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +namespace cugraph { + +// SG instantiation + +template rmm::device_uvector od_shortest_distances( + raft::handle_t const& handle, + graph_view_t const& graph_view, + edge_property_view_t edge_weight_view, + raft::device_span origins, + raft::device_span destinations, + float cutoff, + bool do_expensive_check); + +template rmm::device_uvector od_shortest_distances( + raft::handle_t const& handle, + graph_view_t const& graph_view, + edge_property_view_t edge_weight_view, + raft::device_span origins, + raft::device_span destinations, + double cutoff, + bool do_expensive_check); + +template rmm::device_uvector od_shortest_distances( + raft::handle_t const& handle, + graph_view_t const& graph_view, + edge_property_view_t edge_weight_view, + raft::device_span origins, + raft::device_span destinations, + float cutoff, + bool do_expensive_check); + +template rmm::device_uvector od_shortest_distances( + raft::handle_t const& handle, + graph_view_t const& graph_view, + edge_property_view_t edge_weight_view, + raft::device_span origins, + raft::device_span destinations, + double cutoff, + bool do_expensive_check); + +template rmm::device_uvector od_shortest_distances( + raft::handle_t const& handle, + graph_view_t const& graph_view, + edge_property_view_t edge_weight_view, + raft::device_span origins, + raft::device_span destinations, + float cutoff, + bool do_expensive_check); + +template rmm::device_uvector od_shortest_distances( + raft::handle_t const& handle, + graph_view_t const& graph_view, + edge_property_view_t edge_weight_view, + raft::device_span origins, + raft::device_span destinations, + double cutoff, + bool do_expensive_check); + +} // namespace cugraph diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 3b376c78e43..e91b7e71537 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -351,6 +351,10 @@ ConfigureTest(MSBFS_TEST traversal/ms_bfs_test.cu) # - SSSP tests ------------------------------------------------------------------------------------ ConfigureTest(SSSP_TEST traversal/sssp_test.cpp) +################################################################################################### +# - OD_SHORTEST_DISTANCES tests ------------------------------------------------------------------- +ConfigureTest(OD_SHORTEST_DISTANCES_TEST traversal/od_shortest_distances_test.cpp) + ################################################################################################### # - HITS tests ------------------------------------------------------------------------------------ ConfigureTest(HITS_TEST link_analysis/hits_test.cpp) diff --git a/cpp/tests/traversal/od_shortest_distances_test.cpp b/cpp/tests/traversal/od_shortest_distances_test.cpp new file mode 100644 index 00000000000..cc283f24dfd --- /dev/null +++ b/cpp/tests/traversal/od_shortest_distances_test.cpp @@ -0,0 +1,286 @@ +/* + * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governin_from_mtxg permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include + +// Dijkstra's algorithm +template +void sssp_reference(edge_t const* offsets, + vertex_t const* indices, + weight_t const* weights, + weight_t* distances, + vertex_t num_vertices, + vertex_t source, + weight_t cutoff = std::numeric_limits::max()) +{ + using queue_item_t = std::tuple; + + std::fill(distances, distances + num_vertices, std::numeric_limits::max()); + + *(distances + source) = weight_t{0.0}; + std::priority_queue, std::greater> queue{}; + queue.push(std::make_tuple(weight_t{0.0}, source)); + + while (queue.size() > 0) { + weight_t distance{}; + vertex_t row{}; + std::tie(distance, row) = queue.top(); + queue.pop(); + if (distance <= *(distances + row)) { + auto nbr_offsets = *(offsets + row); + auto nbr_offset_last = *(offsets + row + 1); + for (auto nbr_offset = nbr_offsets; nbr_offset != nbr_offset_last; ++nbr_offset) { + auto nbr = *(indices + nbr_offset); + auto new_distance = distance + *(weights + nbr_offset); + auto threshold = std::min(*(distances + nbr), cutoff); + if (new_distance < threshold) { + *(distances + nbr) = new_distance; + queue.push(std::make_tuple(new_distance, nbr)); + } + } + } + } +} + +struct ODShortestDistances_Usecase { + size_t num_origins{0}; + size_t num_destinations{0}; + bool check_correctness{true}; +}; + +template +class Tests_ODShortestDistances + : public ::testing::TestWithParam> { + public: + Tests_ODShortestDistances() {} + + static void SetUpTestCase() {} + static void TearDownTestCase() {} + + virtual void SetUp() {} + virtual void TearDown() {} + + template + void run_current_test(ODShortestDistances_Usecase const& od_usecase, + input_usecase_t const& input_usecase) + { + constexpr bool renumber = true; + + raft::handle_t handle{}; + HighResTimer hr_timer{}; + + if (cugraph::test::g_perf) { + RAFT_CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + hr_timer.start("Construct graph"); + } + + auto [graph, edge_weights, d_renumber_map_labels] = + cugraph::test::construct_graph( + handle, input_usecase, true, renumber); + + if (cugraph::test::g_perf) { + RAFT_CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + hr_timer.stop(); + hr_timer.display_and_clear(std::cout); + } + + auto graph_view = graph.view(); + auto edge_weight_view = + edge_weights ? std::make_optional((*edge_weights).view()) : std::nullopt; + + raft::random::RngState rng_state(0); + auto origins = cugraph::select_random_vertices( + handle, graph_view, std::nullopt, rng_state, od_usecase.num_origins, false, true); + auto destinations = cugraph::select_random_vertices( + handle, graph_view, std::nullopt, rng_state, od_usecase.num_destinations, false, true); + + if (cugraph::test::g_perf) { + RAFT_CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + hr_timer.start("ODShortestDistances"); + } + + auto od_matrix = cugraph::od_shortest_distances( + handle, + graph_view, + *edge_weight_view, + raft::device_span(origins.data(), origins.size()), + raft::device_span(destinations.data(), destinations.size()), + std::numeric_limits::max(), + false); + + if (cugraph::test::g_perf) { + RAFT_CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + hr_timer.stop(); + hr_timer.display_and_clear(std::cout); + } + + if (od_usecase.check_correctness) { + cugraph::graph_t unrenumbered_graph(handle); + std::optional< + cugraph::edge_property_t, weight_t>> + unrenumbered_edge_weights{std::nullopt}; + if (renumber) { + std::tie(unrenumbered_graph, unrenumbered_edge_weights, std::ignore) = + cugraph::test::construct_graph( + handle, input_usecase, true, false); + } + auto unrenumbered_graph_view = renumber ? unrenumbered_graph.view() : graph_view; + auto unrenumbered_edge_weight_view = + renumber + ? (unrenumbered_edge_weights ? std::make_optional((*unrenumbered_edge_weights).view()) + : std::nullopt) + : edge_weight_view; + + auto h_offsets = cugraph::test::to_host( + handle, unrenumbered_graph_view.local_edge_partition_view().offsets()); + auto h_indices = cugraph::test::to_host( + handle, unrenumbered_graph_view.local_edge_partition_view().indices()); + auto h_weights = cugraph::test::to_host( + handle, + raft::device_span((*unrenumbered_edge_weight_view).value_firsts()[0], + (*unrenumbered_edge_weight_view).edge_counts()[0])); + + auto unrenumbered_origins = cugraph::test::to_host(handle, origins); + auto unrenumbered_destinations = cugraph::test::to_host(handle, destinations); + if (renumber) { + auto h_renumber_map_labels = cugraph::test::to_host(handle, *d_renumber_map_labels); + std::transform(unrenumbered_origins.begin(), + unrenumbered_origins.end(), + unrenumbered_origins.begin(), + [&h_renumber_map_labels](auto v) { return h_renumber_map_labels[v]; }); + std::transform(unrenumbered_destinations.begin(), + unrenumbered_destinations.end(), + unrenumbered_destinations.begin(), + [&h_renumber_map_labels](auto v) { return h_renumber_map_labels[v]; }); + } + + std::vector h_reference_od_matrix(od_matrix.size()); + for (size_t i = 0; i < unrenumbered_origins.size(); ++i) { + std::vector reference_distances(unrenumbered_graph_view.number_of_vertices()); + + sssp_reference(h_offsets.data(), + h_indices.data(), + h_weights.data(), + reference_distances.data(), + unrenumbered_graph_view.number_of_vertices(), + unrenumbered_origins[i], + std::numeric_limits::max()); + + for (size_t j = 0; j < unrenumbered_destinations.size(); ++j) { + h_reference_od_matrix[i * unrenumbered_destinations.size() + j] = + reference_distances[unrenumbered_destinations[j]]; + } + } + + auto h_cugraph_od_matrix = cugraph::test::to_host(handle, od_matrix); + + auto max_weight_element = std::max_element(h_weights.begin(), h_weights.end()); + auto epsilon = (*max_weight_element) * weight_t{1e-6}; + auto nearly_equal = [epsilon](auto lhs, auto rhs) { return std::fabs(lhs - rhs) < epsilon; }; + + ASSERT_TRUE(std::equal(h_reference_od_matrix.begin(), + h_reference_od_matrix.end(), + h_cugraph_od_matrix.begin(), + nearly_equal)) + << "distances do not match with the reference values."; + } + } +}; + +using Tests_ODShortestDistances_File = Tests_ODShortestDistances; +using Tests_ODShortestDistances_Rmat = Tests_ODShortestDistances; + +TEST_P(Tests_ODShortestDistances_File, CheckInt32Int32Float) +{ + auto param = GetParam(); + run_current_test(std::get<0>(param), std::get<1>(param)); +} + +TEST_P(Tests_ODShortestDistances_Rmat, CheckInt32Int32Float) +{ + auto param = GetParam(); + run_current_test( + std::get<0>(param), override_Rmat_Usecase_with_cmd_line_arguments(std::get<1>(param))); +} + +TEST_P(Tests_ODShortestDistances_Rmat, CheckInt32Int64Float) +{ + auto param = GetParam(); + run_current_test( + std::get<0>(param), override_Rmat_Usecase_with_cmd_line_arguments(std::get<1>(param))); +} + +TEST_P(Tests_ODShortestDistances_Rmat, CheckInt64Int64Float) +{ + auto param = GetParam(); + run_current_test( + std::get<0>(param), override_Rmat_Usecase_with_cmd_line_arguments(std::get<1>(param))); +} + +INSTANTIATE_TEST_SUITE_P( + file_test, + Tests_ODShortestDistances_File, + // enable correctness checks + ::testing::Values(std::make_tuple(ODShortestDistances_Usecase{5, 5}, + cugraph::test::File_Usecase("test/datasets/karate.mtx")), + std::make_tuple(ODShortestDistances_Usecase{10, 20}, + cugraph::test::File_Usecase("test/datasets/netscience.mtx")), + std::make_tuple(ODShortestDistances_Usecase{50, 100}, + cugraph::test::File_Usecase("test/datasets/wiki2003.mtx")))); + +INSTANTIATE_TEST_SUITE_P( + rmat_small_test, + Tests_ODShortestDistances_Rmat, + // enable correctness checks + ::testing::Values( + std::make_tuple(ODShortestDistances_Usecase{10, 100}, + cugraph::test::Rmat_Usecase(10, 16, 0.57, 0.19, 0.19, 0, false, false)))); + +INSTANTIATE_TEST_SUITE_P( + rmat_benchmark_test, /* note that scale & edge factor can be overridden in benchmarking (with + --gtest_filter to select only the rmat_benchmark_test with a specific + vertex & edge type combination) by command line arguments and do not + include more than one Rmat_Usecase that differ only in scale or edge + factor (to avoid running same benchmarks more than once) */ + Tests_ODShortestDistances_Rmat, + // disable correctness checks for large graphs + ::testing::Values( + std::make_tuple(ODShortestDistances_Usecase{10, 100, false}, + cugraph::test::Rmat_Usecase(20, 32, 0.57, 0.19, 0.19, 0, false, false)))); + +CUGRAPH_TEST_PROGRAM_MAIN()