From 8a33bea6dd47c66641fdef16aca20995a8b85b73 Mon Sep 17 00:00:00 2001 From: mwish Date: Thu, 4 Apr 2024 13:14:22 +0800 Subject: [PATCH 1/8] GH-40994: RleBooleanDecoder supports DecodeArrow with nulls --- cpp/src/parquet/encoding.cc | 57 +++++++++++++++++++-------- cpp/src/parquet/encoding_benchmark.cc | 9 ++--- cpp/src/parquet/encoding_test.cc | 4 -- 3 files changed, 45 insertions(+), 25 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index f16e9b34fc682..49bef50ca153e 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -3143,27 +3143,52 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits::Accumulator* out) override { - if (null_count != 0) { - // TODO(ARROW-34660): implement DecodeArrow with null slots. - ParquetException::NYI("RleBoolean DecodeArrow with null slots"); + if (null_count == num_values) { + PARQUET_THROW_NOT_OK(out->AppendNulls(null_count)); + return 0; } constexpr int kBatchSize = 1024; std::array values; - int sum_decode_count = 0; - do { - int current_batch = std::min(kBatchSize, num_values); - int decoded_count = decoder_->GetBatch(values.data(), current_batch); - if (decoded_count == 0) { - break; + // Reserve all values including nulls first + PARQUET_THROW_NOT_OK(out->Reserve(num_values)); + const int num_boolean_values_sum = num_values - null_count; + const int num_boolean_values = num_boolean_values_sum; + int current_index_in_batch = 0; + int current_batch_size = 0; + auto next_boolean_batch = [&]() { + DCHECK_GT(num_boolean_values, 0); + DCHECK_EQ(current_index_in_batch, current_batch_size); + current_batch_size = std::min(num_boolean_values, kBatchSize); + int decoded_count = decoder_->GetBatch(values.data(), current_batch_size); + if (decoded_count != current_batch_size) { + ParquetException::EofException(); } - sum_decode_count += decoded_count; - PARQUET_THROW_NOT_OK(out->Reserve(sum_decode_count)); - for (int i = 0; i < decoded_count; ++i) { - PARQUET_THROW_NOT_OK(out->Append(values[i])); + current_index_in_batch = 0; + }; + if (null_count == 0) { + int sum_decode_count = 0; + do { + next_boolean_batch(); + sum_decode_count += current_batch_size; + PARQUET_THROW_NOT_OK( + out->AppendValues(values.begin(), values.begin() + current_batch_size)); + num_values -= current_batch_size; + current_index_in_batch = 0; + } while (num_values > 0); + return sum_decode_count; + } + auto next_value = [&]() -> bool { + if (current_index_in_batch == current_batch_size) { + next_boolean_batch(); } - num_values -= decoded_count; - } while (num_values > 0); - return sum_decode_count; + bool value = values.at(current_index_in_batch); + ++current_index_in_batch; + return value; + }; + VisitNullBitmapInline( + valid_bits, valid_bits_offset, num_values, null_count, + [&]() { out->UnsafeAppend(next_value()); }, [&]() { out->UnsafeAppendNull(); }); + return num_boolean_values_sum; } int DecodeArrow( diff --git a/cpp/src/parquet/encoding_benchmark.cc b/cpp/src/parquet/encoding_benchmark.cc index 9c07d262b350e..a858c53e931d8 100644 --- a/cpp/src/parquet/encoding_benchmark.cc +++ b/cpp/src/parquet/encoding_benchmark.cc @@ -1518,11 +1518,10 @@ BENCHMARK_DEFINE_F(BM_DecodeArrowBooleanRle, DecodeArrowNonNull) (benchmark::State& state) { DecodeArrowNonNullDenseBenchmark(state); } BENCHMARK_REGISTER_F(BM_DecodeArrowBooleanRle, DecodeArrowNonNull) ->Range(MIN_RANGE, MAX_RANGE); -// TODO(mwish): RleBoolean not implemented DecodeArrow with null slots yet. -// BENCHMARK_DEFINE_F(BM_DecodeArrowBooleanRle, DecodeArrowWithNull) -//(benchmark::State& state) { DecodeArrowWithNullDenseBenchmark(state); } -// BENCHMARK_REGISTER_F(BM_DecodeArrowBooleanRle, DecodeArrowWithNull) -// ->Apply(BooleanWithNullCustomArguments); +BENCHMARK_DEFINE_F(BM_DecodeArrowBooleanRle, DecodeArrowWithNull) +(benchmark::State& state) { DecodeArrowWithNullDenseBenchmark(state); } +BENCHMARK_REGISTER_F(BM_DecodeArrowBooleanRle, DecodeArrowWithNull) + ->Apply(BooleanWithNullCustomArguments); BENCHMARK_DEFINE_F(BM_DecodeArrowBooleanPlain, DecodeArrow) (benchmark::State& state) { DecodeArrowDenseBenchmark(state); } diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index ea0029f4c7d7f..1674e9e9a456b 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -963,16 +963,12 @@ TYPED_TEST(EncodingAdHocTyped, ByteStreamSplitArrowDirectPut) { } TYPED_TEST(EncodingAdHocTyped, RleArrowDirectPut) { - // TODO: test with nulls once RleBooleanDecoder::DecodeArrow supports them - this->null_probability_ = 0; for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) { this->Rle(seed); } } TYPED_TEST(EncodingAdHocTyped, DeltaBitPackArrowDirectPut) { - // TODO: test with nulls once DeltaBitPackDecoder::DecodeArrow supports them - this->null_probability_ = 0; for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) { this->DeltaBitPack(seed); } From f83b959e35e765dec6452b6aa60b0ddd991f27b2 Mon Sep 17 00:00:00 2001 From: mwish Date: Thu, 4 Apr 2024 13:28:01 +0800 Subject: [PATCH 2/8] fix test --- cpp/src/parquet/encoding_test.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 1674e9e9a456b..b434a53d480a4 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -969,6 +969,8 @@ TYPED_TEST(EncodingAdHocTyped, RleArrowDirectPut) { } TYPED_TEST(EncodingAdHocTyped, DeltaBitPackArrowDirectPut) { + // TODO: test with nulls once DeltaBitPackDecoder::DecodeArrow supports them + this->null_probability_ = 0; for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) { this->DeltaBitPack(seed); } From b6f655201a0b72da1a35f66bb5fc20dd8e86b718 Mon Sep 17 00:00:00 2001 From: mwish Date: Thu, 4 Apr 2024 16:10:51 +0800 Subject: [PATCH 3/8] resolve comment --- cpp/src/parquet/encoding.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 49bef50ca153e..dd55973f92d53 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -3180,8 +3180,10 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { auto next_value = [&]() -> bool { if (current_index_in_batch == current_batch_size) { next_boolean_batch(); + DCHECK_GT(current_batch_size, 0); } - bool value = values.at(current_index_in_batch); + DCHECK_LT(current_index_in_batch, current_batch_size); + bool value = values[current_index_in_batch]; ++current_index_in_batch; return value; }; From cb1a8421780e92a01b5807ea8d85e7e7748ef436 Mon Sep 17 00:00:00 2001 From: mwish Date: Thu, 4 Apr 2024 16:56:14 +0800 Subject: [PATCH 4/8] fix bug for decodeArrow --- cpp/src/parquet/encoding.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index dd55973f92d53..2f09515ff075d 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -3152,7 +3152,8 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { // Reserve all values including nulls first PARQUET_THROW_NOT_OK(out->Reserve(num_values)); const int num_boolean_values_sum = num_values - null_count; - const int num_boolean_values = num_boolean_values_sum; + // Remaining boolean values to read + int num_boolean_values = num_boolean_values_sum; int current_index_in_batch = 0; int current_batch_size = 0; auto next_boolean_batch = [&]() { @@ -3163,6 +3164,7 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { if (decoded_count != current_batch_size) { ParquetException::EofException(); } + num_boolean_values -= current_batch_size; current_index_in_batch = 0; }; if (null_count == 0) { From bc77a15edf3eb010ed8688be47dcd9b796b32d22 Mon Sep 17 00:00:00 2001 From: mwish Date: Thu, 4 Apr 2024 17:10:24 +0800 Subject: [PATCH 5/8] add a test for DecodeArrow --- cpp/src/parquet/encoding_test.cc | 44 +++++++++++++++++--------------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index b434a53d480a4..bb5126ce251d4 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -602,7 +602,7 @@ TEST(PlainEncodingAdHoc, ArrowBinaryDirectPut) { // Check that one can put several Arrow arrays into a given encoder // and decode to the right values (see GH-36939) -TEST(PlainBooleanArrayEncoding, AdHocRoundTrip) { +TEST(BooleanArrayEncoding, AdHocRoundTrip) { std::vector> arrays{ ::arrow::ArrayFromJSON(::arrow::boolean(), R"([])"), ::arrow::ArrayFromJSON(::arrow::boolean(), R"([false, null, true])"), @@ -610,27 +610,29 @@ TEST(PlainBooleanArrayEncoding, AdHocRoundTrip) { ::arrow::ArrayFromJSON(::arrow::boolean(), R"([true, null, false])"), }; - auto encoder = MakeTypedEncoder(Encoding::PLAIN, - /*use_dictionary=*/false); - for (const auto& array : arrays) { - encoder->Put(*array); - } - auto buffer = encoder->FlushValues(); - auto decoder = MakeTypedDecoder(Encoding::PLAIN); - EXPECT_OK_AND_ASSIGN(auto expected, ::arrow::Concatenate(arrays)); - decoder->SetData(static_cast(expected->length()), buffer->data(), - static_cast(buffer->size())); - - ::arrow::BooleanBuilder builder; - ASSERT_EQ(static_cast(expected->length() - expected->null_count()), - decoder->DecodeArrow(static_cast(expected->length()), - static_cast(expected->null_count()), - expected->null_bitmap_data(), 0, &builder)); + for (auto encoding : {Encoding::PLAIN, Encoding::RLE}) { + auto encoder = MakeTypedEncoder(encoding, + /*use_dictionary=*/false); + for (const auto& array : arrays) { + encoder->Put(*array); + } + auto buffer = encoder->FlushValues(); + auto decoder = MakeTypedDecoder(encoding); + EXPECT_OK_AND_ASSIGN(auto expected, ::arrow::Concatenate(arrays)); + decoder->SetData(static_cast(expected->length()), buffer->data(), + static_cast(buffer->size())); + + ::arrow::BooleanBuilder builder; + ASSERT_EQ(static_cast(expected->length() - expected->null_count()), + decoder->DecodeArrow(static_cast(expected->length()), + static_cast(expected->null_count()), + expected->null_bitmap_data(), 0, &builder)); - std::shared_ptr<::arrow::Array> result; - ASSERT_OK(builder.Finish(&result)); - ASSERT_EQ(expected->length(), result->length()); - ::arrow::AssertArraysEqual(*expected, *result, /*verbose=*/true); + std::shared_ptr<::arrow::Array> result; + ASSERT_OK(builder.Finish(&result)); + ASSERT_EQ(expected->length(), result->length()); + ::arrow::AssertArraysEqual(*expected, *result, /*verbose=*/true); + } } template From 43d0ea7dd13ade8f6efdce89d8314158905aa5eb Mon Sep 17 00:00:00 2001 From: mwish Date: Thu, 4 Apr 2024 17:15:06 +0800 Subject: [PATCH 6/8] resolve comment --- cpp/src/parquet/encoding.cc | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 2f09515ff075d..16a6dbb4cf65a 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -3151,20 +3151,20 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { std::array values; // Reserve all values including nulls first PARQUET_THROW_NOT_OK(out->Reserve(num_values)); - const int num_boolean_values_sum = num_values - null_count; + const int num_non_null_values = num_values - null_count; // Remaining boolean values to read - int num_boolean_values = num_boolean_values_sum; + int num_remain_non_null_values = num_non_null_values; int current_index_in_batch = 0; int current_batch_size = 0; auto next_boolean_batch = [&]() { - DCHECK_GT(num_boolean_values, 0); + DCHECK_GT(num_remain_non_null_values, 0); DCHECK_EQ(current_index_in_batch, current_batch_size); - current_batch_size = std::min(num_boolean_values, kBatchSize); + current_batch_size = std::min(num_remain_non_null_values, kBatchSize); int decoded_count = decoder_->GetBatch(values.data(), current_batch_size); if (decoded_count != current_batch_size) { ParquetException::EofException(); } - num_boolean_values -= current_batch_size; + num_remain_non_null_values -= current_batch_size; current_index_in_batch = 0; }; if (null_count == 0) { @@ -3192,7 +3192,7 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { VisitNullBitmapInline( valid_bits, valid_bits_offset, num_values, null_count, [&]() { out->UnsafeAppend(next_value()); }, [&]() { out->UnsafeAppendNull(); }); - return num_boolean_values_sum; + return num_non_null_values; } int DecodeArrow( From df15e101b30edfa6f926df85157b7ae9af47a2b0 Mon Sep 17 00:00:00 2001 From: mwish Date: Thu, 4 Apr 2024 17:17:09 +0800 Subject: [PATCH 7/8] resolve comment --- cpp/src/parquet/encoding.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 16a6dbb4cf65a..519c261afc81e 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -3161,7 +3161,8 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { DCHECK_EQ(current_index_in_batch, current_batch_size); current_batch_size = std::min(num_remain_non_null_values, kBatchSize); int decoded_count = decoder_->GetBatch(values.data(), current_batch_size); - if (decoded_count != current_batch_size) { + if (ARROW_PREDICT_FALSE(decoded_count != current_batch_size)) { + // required values is more than values in decoder. ParquetException::EofException(); } num_remain_non_null_values -= current_batch_size; From b0ba4e192b622f4a76aae6a1981d3a558abf503b Mon Sep 17 00:00:00 2001 From: mwish Date: Thu, 4 Apr 2024 19:51:46 +0800 Subject: [PATCH 8/8] simplify variable --- cpp/src/parquet/encoding.cc | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 519c261afc81e..6e93b493392c9 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -3149,10 +3149,9 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { } constexpr int kBatchSize = 1024; std::array values; - // Reserve all values including nulls first - PARQUET_THROW_NOT_OK(out->Reserve(num_values)); const int num_non_null_values = num_values - null_count; - // Remaining boolean values to read + // Remaining non-null boolean values to read from decoder. + // We decode from `decoder_` with maximum 1024 size batches. int num_remain_non_null_values = num_non_null_values; int current_index_in_batch = 0; int current_batch_size = 0; @@ -3168,17 +3167,19 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { num_remain_non_null_values -= current_batch_size; current_index_in_batch = 0; }; + + // Reserve all values including nulls first + PARQUET_THROW_NOT_OK(out->Reserve(num_values)); if (null_count == 0) { - int sum_decode_count = 0; + // Fast-path for not having nulls. do { next_boolean_batch(); - sum_decode_count += current_batch_size; PARQUET_THROW_NOT_OK( out->AppendValues(values.begin(), values.begin() + current_batch_size)); num_values -= current_batch_size; current_index_in_batch = 0; } while (num_values > 0); - return sum_decode_count; + return num_non_null_values; } auto next_value = [&]() -> bool { if (current_index_in_batch == current_batch_size) {