Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve distinct join with set retrieve #15636

Merged
merged 5 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions cpp/include/cudf/detail/distinct_hash_join.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,10 @@ struct hasher_adapter {
template <cudf::has_nested HasNested>
struct distinct_hash_join {
private:
/// Row equality type for nested columns
using nested_row_equal = cudf::experimental::row::equality::strong_index_comparator_adapter<
cudf::experimental::row::equality::device_row_comparator<true, cudf::nullate::DYNAMIC>>;
/// Row equality type for flat columns
using flat_row_equal = cudf::experimental::row::equality::strong_index_comparator_adapter<
cudf::experimental::row::equality::device_row_comparator<false, cudf::nullate::DYNAMIC>>;

/// Device row equal type
using d_equal_type =
std::conditional_t<HasNested == cudf::has_nested::YES, nested_row_equal, flat_row_equal>;
using d_equal_type = cudf::experimental::row::equality::strong_index_comparator_adapter<
cudf::experimental::row::equality::device_row_comparator<HasNested == cudf::has_nested::YES,
cudf::nullate::DYNAMIC>>;
using hasher = hasher_adapter<thrust::identity<hash_value_type>>;
using probing_scheme_type = cuco::linear_probing<1, hasher>;
using cuco_storage_type = cuco::storage<1>;
Expand Down
192 changes: 16 additions & 176 deletions cpp/src/join/distinct_hash_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ namespace cudf {
namespace detail {
namespace {

static auto constexpr DISTINCT_JOIN_BLOCK_SIZE = 256;

template <cudf::has_nested HasNested>
auto prepare_device_equal(
std::shared_ptr<cudf::experimental::row::equality::preprocessed_table> build,
Expand Down Expand Up @@ -82,175 +80,20 @@ class build_keys_fn {

/**
* @brief Device output transform functor to construct `size_type` with `cuco::pair<hash_value_type,
* lhs_index_type>`
* lhs_index_type>` or `cuco::pair<hash_value_type, rhs_index_type>`
*/
struct output_fn {
__device__ constexpr cudf::size_type operator()(
cuco::pair<hash_value_type, lhs_index_type> const& x) const
{
return static_cast<cudf::size_type>(x.second);
}
};

template <typename Tile>
__device__ void flush_buffer(Tile const& tile,
cudf::size_type tile_count,
cuco::pair<cudf::size_type, cudf::size_type>* buffer,
cudf::size_type* counter,
cudf::size_type* build_indices,
cudf::size_type* probe_indices)
{
cudf::size_type offset;
auto const lane_id = tile.thread_rank();
if (0 == lane_id) { offset = atomicAdd(counter, tile_count); }
offset = tile.shfl(offset, 0);

for (cudf::size_type i = lane_id; i < tile_count; i += tile.size()) {
auto const& [build_idx, probe_idx] = buffer[i];
*(build_indices + offset + i) = build_idx;
*(probe_indices + offset + i) = probe_idx;
}
}

__device__ void flush_buffer(cooperative_groups::thread_block const& block,
cudf::size_type buffer_size,
cuco::pair<cudf::size_type, cudf::size_type>* buffer,
cudf::size_type* counter,
cudf::size_type* build_indices,
cudf::size_type* probe_indices)
{
auto i = block.thread_rank();
__shared__ cudf::size_type offset;

if (i == 0) { offset = atomicAdd(counter, buffer_size); }
block.sync();

while (i < buffer_size) {
auto const& [build_idx, probe_idx] = buffer[i];
*(build_indices + offset + i) = build_idx;
*(probe_indices + offset + i) = probe_idx;

i += block.size();
}
}

// TODO: custom kernel to be replaced by cuco::static_set::retrieve
template <typename Iter, typename HashTable>
CUDF_KERNEL void distinct_join_probe_kernel(Iter iter,
cudf::size_type n,
HashTable hash_table,
cudf::size_type* counter,
cudf::size_type* build_indices,
cudf::size_type* probe_indices)
{
namespace cg = cooperative_groups;

auto constexpr tile_size = HashTable::cg_size;
auto constexpr window_size = HashTable::window_size;

auto idx = cudf::detail::grid_1d::global_thread_id() / tile_size;
auto const stride = cudf::detail::grid_1d::grid_stride() / tile_size;
auto const block = cg::this_thread_block();

// CG-based probing algorithm
if constexpr (tile_size != 1) {
auto const tile = cg::tiled_partition<tile_size>(block);

auto constexpr flushing_tile_size = cudf::detail::warp_size / window_size;
// random choice to tune
auto constexpr flushing_buffer_size = 2 * flushing_tile_size;
auto constexpr num_flushing_tiles = DISTINCT_JOIN_BLOCK_SIZE / flushing_tile_size;
auto constexpr max_matches = flushing_tile_size / tile_size;

auto const flushing_tile = cg::tiled_partition<flushing_tile_size>(block);
auto const flushing_tile_id = block.thread_rank() / flushing_tile_size;

__shared__ cuco::pair<cudf::size_type, cudf::size_type>
flushing_tile_buffer[num_flushing_tiles][flushing_tile_size];
// per flushing-tile counter to track number of filled elements
__shared__ cudf::size_type flushing_counter[num_flushing_tiles];

if (flushing_tile.thread_rank() == 0) { flushing_counter[flushing_tile_id] = 0; }
flushing_tile.sync(); // sync still needed since cg.any doesn't imply a memory barrier

while (flushing_tile.any(idx < n)) {
bool active_flag = idx < n;
auto const active_flushing_tile =
cg::binary_partition<flushing_tile_size>(flushing_tile, active_flag);
if (active_flag) {
auto const found = hash_table.find(tile, *(iter + idx));
if (tile.thread_rank() == 0 and found != hash_table.end()) {
auto const offset = atomicAdd_block(&flushing_counter[flushing_tile_id], 1);
flushing_tile_buffer[flushing_tile_id][offset] = cuco::pair{
static_cast<cudf::size_type>(found->second), static_cast<cudf::size_type>(idx)};
}
}

flushing_tile.sync();
if (flushing_counter[flushing_tile_id] + max_matches > flushing_buffer_size) {
flush_buffer(flushing_tile,
flushing_counter[flushing_tile_id],
flushing_tile_buffer[flushing_tile_id],
counter,
build_indices,
probe_indices);
flushing_tile.sync();
if (flushing_tile.thread_rank() == 0) { flushing_counter[flushing_tile_id] = 0; }
flushing_tile.sync();
}

idx += stride;
} // while

if (flushing_counter[flushing_tile_id] > 0) {
flush_buffer(flushing_tile,
flushing_counter[flushing_tile_id],
flushing_tile_buffer[flushing_tile_id],
counter,
build_indices,
probe_indices);
}
}
// Scalar probing for CG size 1
else {
using block_scan = cub::BlockScan<cudf::size_type, DISTINCT_JOIN_BLOCK_SIZE>;
__shared__ typename block_scan::TempStorage block_scan_temp_storage;

auto constexpr buffer_capacity = 2 * DISTINCT_JOIN_BLOCK_SIZE;
__shared__ cuco::pair<cudf::size_type, cudf::size_type> buffer[buffer_capacity];
cudf::size_type buffer_size = 0;

while (idx - block.thread_rank() < n) { // the whole thread block falls into the same iteration
auto const found = idx < n ? hash_table.find(*(iter + idx)) : hash_table.end();
auto const has_match = found != hash_table.end();

// Use a whole-block scan to calculate the output location
cudf::size_type offset;
cudf::size_type block_count;
block_scan(block_scan_temp_storage)
.ExclusiveSum(static_cast<cudf::size_type>(has_match), offset, block_count);

if (buffer_size + block_count > buffer_capacity) {
flush_buffer(block, buffer_size, buffer, counter, build_indices, probe_indices);
block.sync();
buffer_size = 0;
}

if (has_match) {
buffer[buffer_size + offset] = cuco::pair{static_cast<cudf::size_type>(found->second),
static_cast<cudf::size_type>(idx)};
}
buffer_size += block_count;
block.sync();

idx += stride;
} // while

if (buffer_size > 0) {
flush_buffer(block, buffer_size, buffer, counter, build_indices, probe_indices);
}
__device__ constexpr cudf::size_type operator()(
cuco::pair<hash_value_type, rhs_index_type> const& x) const
{
return static_cast<cudf::size_type>(x.second);
}
}
};
} // namespace

template <cudf::has_nested HasNested>
Expand Down Expand Up @@ -332,19 +175,16 @@ distinct_hash_join<HasNested>::inner_join(rmm::cuda_stream_view stream,
auto const d_probe_hasher = probe_row_hasher.device_hasher(nullate::DYNAMIC{this->_has_nulls});
auto const iter = cudf::detail::make_counting_transform_iterator(
0, build_keys_fn<decltype(d_probe_hasher), rhs_index_type>{d_probe_hasher});
auto counter = rmm::device_scalar<cudf::size_type>{stream};
counter.set_value_to_zero_async(stream);

cudf::detail::grid_1d grid{probe_table_num_rows, DISTINCT_JOIN_BLOCK_SIZE};
distinct_join_probe_kernel<<<grid.num_blocks, grid.num_threads_per_block, 0, stream.value()>>>(
iter,
probe_table_num_rows,
this->_hash_table.ref(cuco::find),
counter.data(),
build_indices->data(),
probe_indices->data());

auto const actual_size = counter.value(stream);

auto const build_indices_begin =
thrust::make_transform_output_iterator(build_indices->begin(), output_fn{});
auto const probe_indices_begin =
thrust::make_transform_output_iterator(probe_indices->begin(), output_fn{});

auto const [probe_indices_end, _] = this->_hash_table.retrieve(
iter, iter + probe_table_num_rows, probe_indices_begin, build_indices_begin, stream.value());

auto const actual_size = std::distance(probe_indices_begin, probe_indices_end);
build_indices->resize(actual_size, stream);
probe_indices->resize(actual_size, stream);

Expand Down
Loading