Skip to content

Commit

Permalink
Add match bitvector update for left joins
Browse files Browse the repository at this point in the history
  • Loading branch information
zanmato1984 committed Jan 3, 2024
1 parent 4322db5 commit f30a382
Showing 1 changed file with 11 additions and 3 deletions.
14 changes: 11 additions & 3 deletions cpp/src/arrow/acero/swiss_join.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2256,7 +2256,12 @@ Status JoinProbeProcessor::OnNextBatch(int64_t thread_id,
materialize_key_ids_buf.mutable_data(),
materialize_payload_ids_buf.mutable_data(), /*output_payload_ids=*/true,
!(no_duplicate_keys || no_payload_columns), temp_stack, &num_matches_next));
// TODO: Index to bit vector.
std::memset(filtered_bitvector_buf.mutable_data(), 0,
bit_util::BytesForBits(minibatch_size_next));
for (int i = 0; i < num_matches_next; ++i) {
int bit_idx = materialize_batch_ids_buf.mutable_data()[i] - minibatch_start;
bit_util::SetBitTo(filtered_bitvector_buf.mutable_data(), bit_idx, 1);
}
}
const uint16_t* materialize_batch_ids = materialize_batch_ids_buf.mutable_data();
const uint32_t* materialize_key_ids = materialize_key_ids_buf.mutable_data();
Expand Down Expand Up @@ -2289,8 +2294,11 @@ Status JoinProbeProcessor::OnNextBatch(int64_t thread_id,
// the other side of the join.
//
if (join_type_ == JoinType::LEFT_OUTER || join_type_ == JoinType::FULL_OUTER) {
if (residual_filter_) {
// TODO: and match bit vector.
if (!residual_filter_->IsTrivial()) {
arrow::internal::BitmapAnd(match_bitvector_buf.mutable_data(), 0,
filtered_bitvector_buf.mutable_data(), 0,
minibatch_size_next, 0,
match_bitvector_buf.mutable_data());
}
int num_passing_ids = 0;
arrow::util::bit_util::bits_to_indexes(
Expand Down

0 comments on commit f30a382

Please sign in to comment.