From 4322db59d7c811b2d4b0f659bbcb9db0595ef9a0 Mon Sep 17 00:00:00 2001 From: zanmato Date: Tue, 2 Jan 2024 14:36:34 -0800 Subject: [PATCH] Init residual filter in probe processor --- cpp/src/arrow/acero/swiss_join.cc | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/cpp/src/arrow/acero/swiss_join.cc b/cpp/src/arrow/acero/swiss_join.cc index ad8e44c585e7c..b1937407252f9 100644 --- a/cpp/src/arrow/acero/swiss_join.cc +++ b/cpp/src/arrow/acero/swiss_join.cc @@ -1930,8 +1930,8 @@ Status JoinResidualFilter::FilterMatchBitVector( } if (num_build_keys_referred_ == 0 && num_build_payloads_referred_ == 0) { - // If filter refers no column in right table, - // TODO + // If filter refers no column in the right table, then we can directly filter on the + // left rows without inner matching and materializing the right rows. // arrow::util::bit_util::bits_to_indexes(bit_match, hardware_flags_, num_batch_rows, match_bitvector, num_passing_ids, @@ -1951,10 +1951,11 @@ Status JoinResidualFilter::FilterMatchBitVector( passing_batch_row_ids[i] += static_cast(batch_start_row); } - RETURN_NOT_OK(FilterMatchRowIds(keypayload_batch, *num_passing_ids, - passing_batch_row_ids, passing_key_ids_maybe_null, - NULLPTR, passing_key_ids_maybe_null, false, - temp_stack, num_passing_ids)); + RETURN_NOT_OK( + FilterMatchRowIds(keypayload_batch, *num_passing_ids, passing_batch_row_ids, + passing_key_ids_maybe_null, /*payload_ids_maybe_null=*/NULLPTR, + passing_key_ids_maybe_null, + /*output_payload_ids=*/false, temp_stack, num_passing_ids)); return Status::OK(); } @@ -1969,6 +1970,7 @@ Status JoinResidualFilter::FilterMatchBitVector( match_iterator.SetLookupResult(num_batch_rows, batch_start_row, match_bitvector, key_ids, no_duplicate_keys, key_to_payload_); int num_matches_next = 0; + // Last row id passing the filter, used to filter out duplicate rows. uint32_t row_id_last = std::numeric_limits::max() + 1; while (match_iterator.GetNextBatch(minibatch_size_, &num_matches_next, materialize_batch_ids_buf.mutable_data(), @@ -1978,8 +1980,8 @@ Status JoinResidualFilter::FilterMatchBitVector( RETURN_NOT_OK(FilterMatchRowIds( keypayload_batch, num_matches_next, materialize_batch_ids_buf.mutable_data(), materialize_key_ids_buf.mutable_data(), - materialize_payload_ids_buf.mutable_data(), passing_key_ids_maybe_null, false, - temp_stack, &num_filtered)); + materialize_payload_ids_buf.mutable_data(), passing_key_ids_maybe_null, + /*output_payload_ids=*/false, temp_stack, &num_filtered)); for (int ifiltered = 0; ifiltered < num_filtered; ++ifiltered) { if (materialize_batch_ids_buf.mutable_data()[ifiltered] == row_id_last) { continue; @@ -2252,7 +2254,7 @@ Status JoinProbeProcessor::OnNextBatch(int64_t thread_id, keypayload_batch, num_matches_next, materialize_batch_ids_buf.mutable_data(), materialize_key_ids_buf.mutable_data(), - materialize_payload_ids_buf.mutable_data(), true, + materialize_payload_ids_buf.mutable_data(), /*output_payload_ids=*/true, !(no_duplicate_keys || no_payload_columns), temp_stack, &num_matches_next)); // TODO: Index to bit vector. }