Skip to content

Commit

Permalink
apacheGH-40994: [C++][Parquet] RleBooleanDecoder supports DecodeArrow…
Browse files Browse the repository at this point in the history
… with nulls (apache#40995)

### Rationale for this change

Supports DecodeArrow with nulls in RleBooleanDecoder

### What changes are included in this PR?

Supports DecodeArrow with nulls in RleBooleanDecoder

### Are these changes tested?

Yes

### Are there any user-facing changes?

currently not

* GitHub Issue: apache#40994

Lead-authored-by: mwish <maplewish117@gmail.com>
Co-authored-by: mwish <anmmscs_maple@qq.com>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
mapleFU and mapleFU authored Apr 4, 2024
1 parent bbeeb33 commit b99b00d
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 44 deletions.
63 changes: 47 additions & 16 deletions cpp/src/parquet/encoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3143,27 +3143,58 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder {
int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
typename EncodingTraits<BooleanType>::Accumulator* out) override {
if (null_count != 0) {
// TODO(ARROW-34660): implement DecodeArrow with null slots.
ParquetException::NYI("RleBoolean DecodeArrow with null slots");
if (null_count == num_values) {
PARQUET_THROW_NOT_OK(out->AppendNulls(null_count));
return 0;
}
constexpr int kBatchSize = 1024;
std::array<bool, kBatchSize> values;
int sum_decode_count = 0;
do {
int current_batch = std::min(kBatchSize, num_values);
int decoded_count = decoder_->GetBatch(values.data(), current_batch);
if (decoded_count == 0) {
break;
const int num_non_null_values = num_values - null_count;
// Remaining non-null boolean values to read from decoder.
// We decode from `decoder_` with maximum 1024 size batches.
int num_remain_non_null_values = num_non_null_values;
int current_index_in_batch = 0;
int current_batch_size = 0;
auto next_boolean_batch = [&]() {
DCHECK_GT(num_remain_non_null_values, 0);
DCHECK_EQ(current_index_in_batch, current_batch_size);
current_batch_size = std::min(num_remain_non_null_values, kBatchSize);
int decoded_count = decoder_->GetBatch(values.data(), current_batch_size);
if (ARROW_PREDICT_FALSE(decoded_count != current_batch_size)) {
// required values is more than values in decoder.
ParquetException::EofException();
}
sum_decode_count += decoded_count;
PARQUET_THROW_NOT_OK(out->Reserve(sum_decode_count));
for (int i = 0; i < decoded_count; ++i) {
PARQUET_THROW_NOT_OK(out->Append(values[i]));
num_remain_non_null_values -= current_batch_size;
current_index_in_batch = 0;
};

// Reserve all values including nulls first
PARQUET_THROW_NOT_OK(out->Reserve(num_values));
if (null_count == 0) {
// Fast-path for not having nulls.
do {
next_boolean_batch();
PARQUET_THROW_NOT_OK(
out->AppendValues(values.begin(), values.begin() + current_batch_size));
num_values -= current_batch_size;
current_index_in_batch = 0;
} while (num_values > 0);
return num_non_null_values;
}
auto next_value = [&]() -> bool {
if (current_index_in_batch == current_batch_size) {
next_boolean_batch();
DCHECK_GT(current_batch_size, 0);
}
num_values -= decoded_count;
} while (num_values > 0);
return sum_decode_count;
DCHECK_LT(current_index_in_batch, current_batch_size);
bool value = values[current_index_in_batch];
++current_index_in_batch;
return value;
};
VisitNullBitmapInline(
valid_bits, valid_bits_offset, num_values, null_count,
[&]() { out->UnsafeAppend(next_value()); }, [&]() { out->UnsafeAppendNull(); });
return num_non_null_values;
}

int DecodeArrow(
Expand Down
9 changes: 4 additions & 5 deletions cpp/src/parquet/encoding_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1518,11 +1518,10 @@ BENCHMARK_DEFINE_F(BM_DecodeArrowBooleanRle, DecodeArrowNonNull)
(benchmark::State& state) { DecodeArrowNonNullDenseBenchmark(state); }
BENCHMARK_REGISTER_F(BM_DecodeArrowBooleanRle, DecodeArrowNonNull)
->Range(MIN_RANGE, MAX_RANGE);
// TODO(mwish): RleBoolean not implemented DecodeArrow with null slots yet.
// BENCHMARK_DEFINE_F(BM_DecodeArrowBooleanRle, DecodeArrowWithNull)
//(benchmark::State& state) { DecodeArrowWithNullDenseBenchmark(state); }
// BENCHMARK_REGISTER_F(BM_DecodeArrowBooleanRle, DecodeArrowWithNull)
// ->Apply(BooleanWithNullCustomArguments);
BENCHMARK_DEFINE_F(BM_DecodeArrowBooleanRle, DecodeArrowWithNull)
(benchmark::State& state) { DecodeArrowWithNullDenseBenchmark(state); }
BENCHMARK_REGISTER_F(BM_DecodeArrowBooleanRle, DecodeArrowWithNull)
->Apply(BooleanWithNullCustomArguments);

BENCHMARK_DEFINE_F(BM_DecodeArrowBooleanPlain, DecodeArrow)
(benchmark::State& state) { DecodeArrowDenseBenchmark(state); }
Expand Down
46 changes: 23 additions & 23 deletions cpp/src/parquet/encoding_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -602,35 +602,37 @@ TEST(PlainEncodingAdHoc, ArrowBinaryDirectPut) {

// Check that one can put several Arrow arrays into a given encoder
// and decode to the right values (see GH-36939)
TEST(PlainBooleanArrayEncoding, AdHocRoundTrip) {
TEST(BooleanArrayEncoding, AdHocRoundTrip) {
std::vector<std::shared_ptr<::arrow::Array>> arrays{
::arrow::ArrayFromJSON(::arrow::boolean(), R"([])"),
::arrow::ArrayFromJSON(::arrow::boolean(), R"([false, null, true])"),
::arrow::ArrayFromJSON(::arrow::boolean(), R"([null, null, null])"),
::arrow::ArrayFromJSON(::arrow::boolean(), R"([true, null, false])"),
};

auto encoder = MakeTypedEncoder<BooleanType>(Encoding::PLAIN,
/*use_dictionary=*/false);
for (const auto& array : arrays) {
encoder->Put(*array);
}
auto buffer = encoder->FlushValues();
auto decoder = MakeTypedDecoder<BooleanType>(Encoding::PLAIN);
EXPECT_OK_AND_ASSIGN(auto expected, ::arrow::Concatenate(arrays));
decoder->SetData(static_cast<int>(expected->length()), buffer->data(),
static_cast<int>(buffer->size()));

::arrow::BooleanBuilder builder;
ASSERT_EQ(static_cast<int>(expected->length() - expected->null_count()),
decoder->DecodeArrow(static_cast<int>(expected->length()),
static_cast<int>(expected->null_count()),
expected->null_bitmap_data(), 0, &builder));
for (auto encoding : {Encoding::PLAIN, Encoding::RLE}) {
auto encoder = MakeTypedEncoder<BooleanType>(encoding,
/*use_dictionary=*/false);
for (const auto& array : arrays) {
encoder->Put(*array);
}
auto buffer = encoder->FlushValues();
auto decoder = MakeTypedDecoder<BooleanType>(encoding);
EXPECT_OK_AND_ASSIGN(auto expected, ::arrow::Concatenate(arrays));
decoder->SetData(static_cast<int>(expected->length()), buffer->data(),
static_cast<int>(buffer->size()));

::arrow::BooleanBuilder builder;
ASSERT_EQ(static_cast<int>(expected->length() - expected->null_count()),
decoder->DecodeArrow(static_cast<int>(expected->length()),
static_cast<int>(expected->null_count()),
expected->null_bitmap_data(), 0, &builder));

std::shared_ptr<::arrow::Array> result;
ASSERT_OK(builder.Finish(&result));
ASSERT_EQ(expected->length(), result->length());
::arrow::AssertArraysEqual(*expected, *result, /*verbose=*/true);
std::shared_ptr<::arrow::Array> result;
ASSERT_OK(builder.Finish(&result));
ASSERT_EQ(expected->length(), result->length());
::arrow::AssertArraysEqual(*expected, *result, /*verbose=*/true);
}
}

template <typename T>
Expand Down Expand Up @@ -963,8 +965,6 @@ TYPED_TEST(EncodingAdHocTyped, ByteStreamSplitArrowDirectPut) {
}

TYPED_TEST(EncodingAdHocTyped, RleArrowDirectPut) {
// TODO: test with nulls once RleBooleanDecoder::DecodeArrow supports them
this->null_probability_ = 0;
for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) {
this->Rle(seed);
}
Expand Down

0 comments on commit b99b00d

Please sign in to comment.