Skip to content

Commit

Permalink
Init residual filter in probe processor
Browse files Browse the repository at this point in the history
  • Loading branch information
zanmato1984 committed Jan 2, 2024
1 parent 8bfb8d7 commit 4322db5
Showing 1 changed file with 11 additions and 9 deletions.
20 changes: 11 additions & 9 deletions cpp/src/arrow/acero/swiss_join.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1930,8 +1930,8 @@ Status JoinResidualFilter::FilterMatchBitVector(
}

if (num_build_keys_referred_ == 0 && num_build_payloads_referred_ == 0) {
// If filter refers no column in right table,
// TODO
// If filter refers no column in the right table, then we can directly filter on the
// left rows without inner matching and materializing the right rows.
//
arrow::util::bit_util::bits_to_indexes(bit_match, hardware_flags_, num_batch_rows,
match_bitvector, num_passing_ids,
Expand All @@ -1951,10 +1951,11 @@ Status JoinResidualFilter::FilterMatchBitVector(
passing_batch_row_ids[i] += static_cast<uint16_t>(batch_start_row);
}

RETURN_NOT_OK(FilterMatchRowIds(keypayload_batch, *num_passing_ids,
passing_batch_row_ids, passing_key_ids_maybe_null,
NULLPTR, passing_key_ids_maybe_null, false,
temp_stack, num_passing_ids));
RETURN_NOT_OK(
FilterMatchRowIds(keypayload_batch, *num_passing_ids, passing_batch_row_ids,
passing_key_ids_maybe_null, /*payload_ids_maybe_null=*/NULLPTR,
passing_key_ids_maybe_null,
/*output_payload_ids=*/false, temp_stack, num_passing_ids));
return Status::OK();
}

Expand All @@ -1969,6 +1970,7 @@ Status JoinResidualFilter::FilterMatchBitVector(
match_iterator.SetLookupResult(num_batch_rows, batch_start_row, match_bitvector,
key_ids, no_duplicate_keys, key_to_payload_);
int num_matches_next = 0;
// Last row id passing the filter, used to filter out duplicate rows.
uint32_t row_id_last = std::numeric_limits<uint16_t>::max() + 1;
while (match_iterator.GetNextBatch(minibatch_size_, &num_matches_next,
materialize_batch_ids_buf.mutable_data(),
Expand All @@ -1978,8 +1980,8 @@ Status JoinResidualFilter::FilterMatchBitVector(
RETURN_NOT_OK(FilterMatchRowIds(
keypayload_batch, num_matches_next, materialize_batch_ids_buf.mutable_data(),
materialize_key_ids_buf.mutable_data(),
materialize_payload_ids_buf.mutable_data(), passing_key_ids_maybe_null, false,
temp_stack, &num_filtered));
materialize_payload_ids_buf.mutable_data(), passing_key_ids_maybe_null,
/*output_payload_ids=*/false, temp_stack, &num_filtered));
for (int ifiltered = 0; ifiltered < num_filtered; ++ifiltered) {
if (materialize_batch_ids_buf.mutable_data()[ifiltered] == row_id_last) {
continue;
Expand Down Expand Up @@ -2252,7 +2254,7 @@ Status JoinProbeProcessor::OnNextBatch(int64_t thread_id,
keypayload_batch, num_matches_next,
materialize_batch_ids_buf.mutable_data(),
materialize_key_ids_buf.mutable_data(),
materialize_payload_ids_buf.mutable_data(), true,
materialize_payload_ids_buf.mutable_data(), /*output_payload_ids=*/true,
!(no_duplicate_keys || no_payload_columns), temp_stack, &num_matches_next));
// TODO: Index to bit vector.
}
Expand Down

0 comments on commit 4322db5

Please sign in to comment.