From b99b00dd66586cf54b04ce6a51eb1cf68b1510a3 Mon Sep 17 00:00:00 2001 From: mwish Date: Thu, 4 Apr 2024 21:35:00 +0800 Subject: [PATCH] GH-40994: [C++][Parquet] RleBooleanDecoder supports DecodeArrow with nulls (#40995) ### Rationale for this change Supports DecodeArrow with nulls in RleBooleanDecoder ### What changes are included in this PR? Supports DecodeArrow with nulls in RleBooleanDecoder ### Are these changes tested? Yes ### Are there any user-facing changes? currently not * GitHub Issue: #40994 Lead-authored-by: mwish Co-authored-by: mwish Signed-off-by: Antoine Pitrou --- cpp/src/parquet/encoding.cc | 63 ++++++++++++++++++++------- cpp/src/parquet/encoding_benchmark.cc | 9 ++-- cpp/src/parquet/encoding_test.cc | 46 +++++++++---------- 3 files changed, 74 insertions(+), 44 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index f16e9b34fc682..6e93b493392c9 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -3143,27 +3143,58 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits::Accumulator* out) override { - if (null_count != 0) { - // TODO(ARROW-34660): implement DecodeArrow with null slots. - ParquetException::NYI("RleBoolean DecodeArrow with null slots"); + if (null_count == num_values) { + PARQUET_THROW_NOT_OK(out->AppendNulls(null_count)); + return 0; } constexpr int kBatchSize = 1024; std::array values; - int sum_decode_count = 0; - do { - int current_batch = std::min(kBatchSize, num_values); - int decoded_count = decoder_->GetBatch(values.data(), current_batch); - if (decoded_count == 0) { - break; + const int num_non_null_values = num_values - null_count; + // Remaining non-null boolean values to read from decoder. + // We decode from `decoder_` with maximum 1024 size batches. + int num_remain_non_null_values = num_non_null_values; + int current_index_in_batch = 0; + int current_batch_size = 0; + auto next_boolean_batch = [&]() { + DCHECK_GT(num_remain_non_null_values, 0); + DCHECK_EQ(current_index_in_batch, current_batch_size); + current_batch_size = std::min(num_remain_non_null_values, kBatchSize); + int decoded_count = decoder_->GetBatch(values.data(), current_batch_size); + if (ARROW_PREDICT_FALSE(decoded_count != current_batch_size)) { + // required values is more than values in decoder. + ParquetException::EofException(); } - sum_decode_count += decoded_count; - PARQUET_THROW_NOT_OK(out->Reserve(sum_decode_count)); - for (int i = 0; i < decoded_count; ++i) { - PARQUET_THROW_NOT_OK(out->Append(values[i])); + num_remain_non_null_values -= current_batch_size; + current_index_in_batch = 0; + }; + + // Reserve all values including nulls first + PARQUET_THROW_NOT_OK(out->Reserve(num_values)); + if (null_count == 0) { + // Fast-path for not having nulls. + do { + next_boolean_batch(); + PARQUET_THROW_NOT_OK( + out->AppendValues(values.begin(), values.begin() + current_batch_size)); + num_values -= current_batch_size; + current_index_in_batch = 0; + } while (num_values > 0); + return num_non_null_values; + } + auto next_value = [&]() -> bool { + if (current_index_in_batch == current_batch_size) { + next_boolean_batch(); + DCHECK_GT(current_batch_size, 0); } - num_values -= decoded_count; - } while (num_values > 0); - return sum_decode_count; + DCHECK_LT(current_index_in_batch, current_batch_size); + bool value = values[current_index_in_batch]; + ++current_index_in_batch; + return value; + }; + VisitNullBitmapInline( + valid_bits, valid_bits_offset, num_values, null_count, + [&]() { out->UnsafeAppend(next_value()); }, [&]() { out->UnsafeAppendNull(); }); + return num_non_null_values; } int DecodeArrow( diff --git a/cpp/src/parquet/encoding_benchmark.cc b/cpp/src/parquet/encoding_benchmark.cc index 9c07d262b350e..a858c53e931d8 100644 --- a/cpp/src/parquet/encoding_benchmark.cc +++ b/cpp/src/parquet/encoding_benchmark.cc @@ -1518,11 +1518,10 @@ BENCHMARK_DEFINE_F(BM_DecodeArrowBooleanRle, DecodeArrowNonNull) (benchmark::State& state) { DecodeArrowNonNullDenseBenchmark(state); } BENCHMARK_REGISTER_F(BM_DecodeArrowBooleanRle, DecodeArrowNonNull) ->Range(MIN_RANGE, MAX_RANGE); -// TODO(mwish): RleBoolean not implemented DecodeArrow with null slots yet. -// BENCHMARK_DEFINE_F(BM_DecodeArrowBooleanRle, DecodeArrowWithNull) -//(benchmark::State& state) { DecodeArrowWithNullDenseBenchmark(state); } -// BENCHMARK_REGISTER_F(BM_DecodeArrowBooleanRle, DecodeArrowWithNull) -// ->Apply(BooleanWithNullCustomArguments); +BENCHMARK_DEFINE_F(BM_DecodeArrowBooleanRle, DecodeArrowWithNull) +(benchmark::State& state) { DecodeArrowWithNullDenseBenchmark(state); } +BENCHMARK_REGISTER_F(BM_DecodeArrowBooleanRle, DecodeArrowWithNull) + ->Apply(BooleanWithNullCustomArguments); BENCHMARK_DEFINE_F(BM_DecodeArrowBooleanPlain, DecodeArrow) (benchmark::State& state) { DecodeArrowDenseBenchmark(state); } diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index ea0029f4c7d7f..bb5126ce251d4 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -602,7 +602,7 @@ TEST(PlainEncodingAdHoc, ArrowBinaryDirectPut) { // Check that one can put several Arrow arrays into a given encoder // and decode to the right values (see GH-36939) -TEST(PlainBooleanArrayEncoding, AdHocRoundTrip) { +TEST(BooleanArrayEncoding, AdHocRoundTrip) { std::vector> arrays{ ::arrow::ArrayFromJSON(::arrow::boolean(), R"([])"), ::arrow::ArrayFromJSON(::arrow::boolean(), R"([false, null, true])"), @@ -610,27 +610,29 @@ TEST(PlainBooleanArrayEncoding, AdHocRoundTrip) { ::arrow::ArrayFromJSON(::arrow::boolean(), R"([true, null, false])"), }; - auto encoder = MakeTypedEncoder(Encoding::PLAIN, - /*use_dictionary=*/false); - for (const auto& array : arrays) { - encoder->Put(*array); - } - auto buffer = encoder->FlushValues(); - auto decoder = MakeTypedDecoder(Encoding::PLAIN); - EXPECT_OK_AND_ASSIGN(auto expected, ::arrow::Concatenate(arrays)); - decoder->SetData(static_cast(expected->length()), buffer->data(), - static_cast(buffer->size())); - - ::arrow::BooleanBuilder builder; - ASSERT_EQ(static_cast(expected->length() - expected->null_count()), - decoder->DecodeArrow(static_cast(expected->length()), - static_cast(expected->null_count()), - expected->null_bitmap_data(), 0, &builder)); + for (auto encoding : {Encoding::PLAIN, Encoding::RLE}) { + auto encoder = MakeTypedEncoder(encoding, + /*use_dictionary=*/false); + for (const auto& array : arrays) { + encoder->Put(*array); + } + auto buffer = encoder->FlushValues(); + auto decoder = MakeTypedDecoder(encoding); + EXPECT_OK_AND_ASSIGN(auto expected, ::arrow::Concatenate(arrays)); + decoder->SetData(static_cast(expected->length()), buffer->data(), + static_cast(buffer->size())); + + ::arrow::BooleanBuilder builder; + ASSERT_EQ(static_cast(expected->length() - expected->null_count()), + decoder->DecodeArrow(static_cast(expected->length()), + static_cast(expected->null_count()), + expected->null_bitmap_data(), 0, &builder)); - std::shared_ptr<::arrow::Array> result; - ASSERT_OK(builder.Finish(&result)); - ASSERT_EQ(expected->length(), result->length()); - ::arrow::AssertArraysEqual(*expected, *result, /*verbose=*/true); + std::shared_ptr<::arrow::Array> result; + ASSERT_OK(builder.Finish(&result)); + ASSERT_EQ(expected->length(), result->length()); + ::arrow::AssertArraysEqual(*expected, *result, /*verbose=*/true); + } } template @@ -963,8 +965,6 @@ TYPED_TEST(EncodingAdHocTyped, ByteStreamSplitArrowDirectPut) { } TYPED_TEST(EncodingAdHocTyped, RleArrowDirectPut) { - // TODO: test with nulls once RleBooleanDecoder::DecodeArrow supports them - this->null_probability_ = 0; for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) { this->Rle(seed); }