From f30a382e1bc0202fcfa36eab36042a65d674b6d9 Mon Sep 17 00:00:00 2001 From: zanmato Date: Wed, 3 Jan 2024 10:09:43 -0800 Subject: [PATCH] Add match bitvector update for left joins --- cpp/src/arrow/acero/swiss_join.cc | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/acero/swiss_join.cc b/cpp/src/arrow/acero/swiss_join.cc index b1937407252f9..6b29448cea516 100644 --- a/cpp/src/arrow/acero/swiss_join.cc +++ b/cpp/src/arrow/acero/swiss_join.cc @@ -2256,7 +2256,12 @@ Status JoinProbeProcessor::OnNextBatch(int64_t thread_id, materialize_key_ids_buf.mutable_data(), materialize_payload_ids_buf.mutable_data(), /*output_payload_ids=*/true, !(no_duplicate_keys || no_payload_columns), temp_stack, &num_matches_next)); - // TODO: Index to bit vector. + std::memset(filtered_bitvector_buf.mutable_data(), 0, + bit_util::BytesForBits(minibatch_size_next)); + for (int i = 0; i < num_matches_next; ++i) { + int bit_idx = materialize_batch_ids_buf.mutable_data()[i] - minibatch_start; + bit_util::SetBitTo(filtered_bitvector_buf.mutable_data(), bit_idx, 1); + } } const uint16_t* materialize_batch_ids = materialize_batch_ids_buf.mutable_data(); const uint32_t* materialize_key_ids = materialize_key_ids_buf.mutable_data(); @@ -2289,8 +2294,11 @@ Status JoinProbeProcessor::OnNextBatch(int64_t thread_id, // the other side of the join. // if (join_type_ == JoinType::LEFT_OUTER || join_type_ == JoinType::FULL_OUTER) { - if (residual_filter_) { - // TODO: and match bit vector. + if (!residual_filter_->IsTrivial()) { + arrow::internal::BitmapAnd(match_bitvector_buf.mutable_data(), 0, + filtered_bitvector_buf.mutable_data(), 0, + minibatch_size_next, 0, + match_bitvector_buf.mutable_data()); } int num_passing_ids = 0; arrow::util::bit_util::bits_to_indexes(