Skip to content

Commit

Permalink
apacheGH-44081: [C++][Parquet] Fix reported metrics in parquet-arrow-…
Browse files Browse the repository at this point in the history
…reader-writer-benchmark

1. items/sec and bytes/sec were set to the same value in some benchmarks
2. bytes/sec was incorrectly computed for boolean columns
  • Loading branch information
pitrou committed Sep 12, 2024
1 parent 9986b7b commit 726e7de
Showing 1 changed file with 56 additions and 39 deletions.
95 changes: 56 additions & 39 deletions cpp/src/parquet/arrow/reader_writer_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <array>
#include <iostream>
#include <random>
#include <type_traits>

#include "parquet/arrow/reader.h"
#include "parquet/arrow/writer.h"
Expand All @@ -37,6 +38,7 @@
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/bitmap_ops.h"
#include "arrow/util/logging.h"

Expand All @@ -45,6 +47,7 @@ using arrow::ArrayVector;
using arrow::BooleanBuilder;
using arrow::FieldVector;
using arrow::NumericBuilder;
using arrow::Table;

#define EXIT_NOT_OK(s) \
do { \
Expand Down Expand Up @@ -104,13 +107,28 @@ std::shared_ptr<ColumnDescriptor> MakeSchema(Repetition::type repetition) {
repetition == Repetition::REPEATED);
}

template <bool nullable, typename ParquetType>
template <typename ParquetType>
int64_t BytesForItems(int64_t num_items) {
static_assert(!std::is_same_v<ParquetType, FLBAType>,
"BytesForItems unsupported for FLBAType");
return num_items * sizeof(typename ParquetType::c_type);
}

template <>
int64_t BytesForItems<BooleanType>(int64_t num_items) {
return ::arrow::bit_util::BytesForBits(num_items);
}

template <>
int64_t BytesForItems<Float16LogicalType>(int64_t num_items) {
return num_items * sizeof(uint16_t);
}

template <typename ParquetType>
void SetBytesProcessed(::benchmark::State& state, int64_t num_values = BENCHMARK_SIZE) {
const int64_t items_processed = state.iterations() * num_values;
const int64_t bytes_processed = items_processed * sizeof(typename ParquetType::c_type);

state.SetItemsProcessed(bytes_processed);
state.SetBytesProcessed(bytes_processed);
state.SetItemsProcessed(items_processed);
state.SetBytesProcessed(BytesForItems<ParquetType>(items_processed));
}

constexpr int64_t kAlternatingOrNa = -1;
Expand All @@ -132,9 +150,9 @@ std::vector<T> RandomVector(int64_t true_percentage, int64_t vector_size,
}

template <typename ParquetType, typename ArrowType = ArrowType<ParquetType>>
std::shared_ptr<::arrow::Table> TableFromVector(
const std::vector<typename ArrowType::c_type>& vec, bool nullable,
int64_t null_percentage = kAlternatingOrNa) {
std::shared_ptr<Table> TableFromVector(const std::vector<typename ArrowType::c_type>& vec,
bool nullable,
int64_t null_percentage = kAlternatingOrNa) {
if (!nullable) {
ARROW_CHECK_EQ(null_percentage, kAlternatingOrNa);
}
Expand All @@ -153,13 +171,12 @@ std::shared_ptr<::arrow::Table> TableFromVector(

auto field = ::arrow::field("column", type, nullable);
auto schema = ::arrow::schema({field});
return ::arrow::Table::Make(schema, {array});
return Table::Make(schema, {array});
}

template <>
std::shared_ptr<::arrow::Table> TableFromVector<BooleanType>(const std::vector<bool>& vec,
bool nullable,
int64_t null_percentage) {
std::shared_ptr<Table> TableFromVector<BooleanType, ::arrow::BooleanType>(
const std::vector<bool>& vec, bool nullable, int64_t null_percentage) {
BooleanBuilder builder;
if (nullable) {
auto valid_bytes = RandomVector<bool>(/*true_percentage=*/null_percentage, vec.size(),
Expand All @@ -174,21 +191,21 @@ std::shared_ptr<::arrow::Table> TableFromVector<BooleanType>(const std::vector<b
auto field = ::arrow::field("column", ::arrow::boolean(), nullable);
auto schema = std::make_shared<::arrow::Schema>(
std::vector<std::shared_ptr<::arrow::Field>>({field}));
return ::arrow::Table::Make(schema, {array});
return Table::Make(schema, {array});
}

template <bool nullable, typename ParquetType>
static void BM_WriteColumn(::benchmark::State& state) {
using T = typename ParquetType::c_type;
std::vector<T> values(BENCHMARK_SIZE, static_cast<T>(128));
std::shared_ptr<::arrow::Table> table = TableFromVector<ParquetType>(values, nullable);
std::shared_ptr<Table> table = TableFromVector<ParquetType>(values, nullable);

while (state.KeepRunning()) {
auto output = CreateOutputStream();
EXIT_NOT_OK(
WriteTable(*table, ::arrow::default_memory_pool(), output, BENCHMARK_SIZE));
}
SetBytesProcessed<nullable, ParquetType>(state);
SetBytesProcessed<ParquetType>(state);
}

BENCHMARK_TEMPLATE2(BM_WriteColumn, false, Int32Type);
Expand All @@ -205,8 +222,8 @@ BENCHMARK_TEMPLATE2(BM_WriteColumn, true, BooleanType);

int32_t kInfiniteUniqueValues = -1;

std::shared_ptr<::arrow::Table> RandomStringTable(int64_t length, int64_t unique_values,
int64_t null_percentage) {
std::shared_ptr<Table> RandomStringTable(int64_t length, int64_t unique_values,
int64_t null_percentage) {
std::shared_ptr<::arrow::DataType> type = ::arrow::utf8();
std::shared_ptr<::arrow::Array> arr;
::arrow::random::RandomArrayGenerator generator(/*seed=*/500);
Expand All @@ -219,12 +236,12 @@ std::shared_ptr<::arrow::Table> RandomStringTable(int64_t length, int64_t unique
/*min_length=*/3, /*max_length=*/32,
/*null_probability=*/null_probability);
}
return ::arrow::Table::Make(
return Table::Make(
::arrow::schema({::arrow::field("column", type, null_percentage > 0)}), {arr});
}

static void BM_WriteBinaryColumn(::benchmark::State& state) {
std::shared_ptr<::arrow::Table> table =
std::shared_ptr<Table> table =
RandomStringTable(BENCHMARK_SIZE, state.range(1), state.range(0));

while (state.KeepRunning()) {
Expand Down Expand Up @@ -263,7 +280,7 @@ struct Examples<bool> {
static constexpr std::array<bool, 2> values() { return {false, true}; }
};

static void BenchmarkReadTable(::benchmark::State& state, const ::arrow::Table& table,
static void BenchmarkReadTable(::benchmark::State& state, const Table& table,
std::shared_ptr<WriterProperties> properties,
int64_t num_values = -1, int64_t total_bytes = -1) {
auto output = CreateOutputStream();
Expand All @@ -278,7 +295,7 @@ static void BenchmarkReadTable(::benchmark::State& state, const ::arrow::Table&
EXIT_NOT_OK(FileReader::Make(::arrow::default_memory_pool(), std::move(reader),
&arrow_reader));

std::shared_ptr<::arrow::Table> table;
std::shared_ptr<Table> table;
EXIT_NOT_OK(arrow_reader->ReadTable(&table));
}

Expand All @@ -291,7 +308,7 @@ static void BenchmarkReadTable(::benchmark::State& state, const ::arrow::Table&
}
}

static void BenchmarkReadTable(::benchmark::State& state, const ::arrow::Table& table,
static void BenchmarkReadTable(::benchmark::State& state, const Table& table,
int64_t num_values = -1, int64_t total_bytes = -1) {
BenchmarkReadTable(state, table, default_writer_properties(), num_values, total_bytes);
}
Expand All @@ -301,7 +318,7 @@ static void BenchmarkReadArray(::benchmark::State& state,
std::shared_ptr<WriterProperties> properties,
int64_t num_values = -1, int64_t total_bytes = -1) {
auto schema = ::arrow::schema({field("s", array->type(), nullable)});
auto table = ::arrow::Table::Make(schema, {array}, array->length());
auto table = Table::Make(schema, {array}, array->length());

EXIT_NOT_OK(table->Validate());

Expand All @@ -326,13 +343,13 @@ static void BM_ReadColumn(::benchmark::State& state) {
auto values = RandomVector<T>(/*percentage=*/state.range(1), BENCHMARK_SIZE,
Examples<T>::values());

std::shared_ptr<::arrow::Table> table =
std::shared_ptr<Table> table =
TableFromVector<ParquetType>(values, nullable, state.range(0));

auto properties = WriterProperties::Builder().disable_dictionary()->build();

BenchmarkReadTable(state, *table, properties, table->num_rows(),
sizeof(typename ParquetType::c_type) * table->num_rows());
BytesForItems<ParquetType>(table->num_rows()));
}

// There are two parameters here that cover different data distributions.
Expand Down Expand Up @@ -403,12 +420,12 @@ static void BM_ReadColumnPlain(::benchmark::State& state) {
using c_type = typename ArrowType<ParquetType>::c_type;

const std::vector<c_type> values(BENCHMARK_SIZE, static_cast<c_type>(42));
std::shared_ptr<::arrow::Table> table =
std::shared_ptr<Table> table =
TableFromVector<ParquetType>(values, /*nullable=*/nullable, state.range(0));

auto properties = WriterProperties::Builder().disable_dictionary()->build();
BenchmarkReadTable(state, *table, properties, table->num_rows(),
sizeof(c_type) * table->num_rows());
BytesForItems<ParquetType>(table->num_rows()));
}

BENCHMARK_TEMPLATE2(BM_ReadColumnPlain, false, Int32Type)
Expand Down Expand Up @@ -438,7 +455,7 @@ BENCHMARK_TEMPLATE2(BM_ReadColumnPlain, true, Float16LogicalType)
//

static void BM_ReadBinaryColumn(::benchmark::State& state) {
std::shared_ptr<::arrow::Table> table =
std::shared_ptr<Table> table =
RandomStringTable(BENCHMARK_SIZE, state.range(1), state.range(0));

// Offsets + data
Expand Down Expand Up @@ -636,7 +653,7 @@ BENCHMARK(BM_ReadListOfListColumn)->Apply(NestedReadArguments);

static void BM_ReadIndividualRowGroups(::benchmark::State& state) {
std::vector<int64_t> values(BENCHMARK_SIZE, 128);
std::shared_ptr<::arrow::Table> table = TableFromVector<Int64Type>(values, true);
std::shared_ptr<Table> table = TableFromVector<Int64Type>(values, true);
auto output = CreateOutputStream();
// This writes 10 RowGroups
EXIT_NOT_OK(
Expand All @@ -651,27 +668,27 @@ static void BM_ReadIndividualRowGroups(::benchmark::State& state) {
EXIT_NOT_OK(FileReader::Make(::arrow::default_memory_pool(), std::move(reader),
&arrow_reader));

std::vector<std::shared_ptr<::arrow::Table>> tables;
std::vector<std::shared_ptr<Table>> tables;
for (int i = 0; i < arrow_reader->num_row_groups(); i++) {
// Only read the even numbered RowGroups
if ((i % 2) == 0) {
std::shared_ptr<::arrow::Table> table;
std::shared_ptr<Table> table;
EXIT_NOT_OK(arrow_reader->RowGroup(i)->ReadTable(&table));
tables.push_back(table);
}
}

std::shared_ptr<::arrow::Table> final_table;
std::shared_ptr<Table> final_table;
PARQUET_ASSIGN_OR_THROW(final_table, ConcatenateTables(tables));
}
SetBytesProcessed<true, Int64Type>(state);
SetBytesProcessed<Int64Type>(state);
}

BENCHMARK(BM_ReadIndividualRowGroups);

static void BM_ReadMultipleRowGroups(::benchmark::State& state) {
std::vector<int64_t> values(BENCHMARK_SIZE, 128);
std::shared_ptr<::arrow::Table> table = TableFromVector<Int64Type>(values, true);
std::shared_ptr<Table> table = TableFromVector<Int64Type>(values, true);
auto output = CreateOutputStream();
// This writes 10 RowGroups
EXIT_NOT_OK(
Expand All @@ -685,17 +702,17 @@ static void BM_ReadMultipleRowGroups(::benchmark::State& state) {
std::unique_ptr<FileReader> arrow_reader;
EXIT_NOT_OK(FileReader::Make(::arrow::default_memory_pool(), std::move(reader),
&arrow_reader));
std::shared_ptr<::arrow::Table> table;
std::shared_ptr<Table> table;
EXIT_NOT_OK(arrow_reader->ReadRowGroups(rgs, &table));
}
SetBytesProcessed<true, Int64Type>(state);
SetBytesProcessed<Int64Type>(state);
}

BENCHMARK(BM_ReadMultipleRowGroups);

static void BM_ReadMultipleRowGroupsGenerator(::benchmark::State& state) {
std::vector<int64_t> values(BENCHMARK_SIZE, 128);
std::shared_ptr<::arrow::Table> table = TableFromVector<Int64Type>(values, true);
std::shared_ptr<Table> table = TableFromVector<Int64Type>(values, true);
auto output = CreateOutputStream();
// This writes 10 RowGroups
EXIT_NOT_OK(
Expand All @@ -714,9 +731,9 @@ static void BM_ReadMultipleRowGroupsGenerator(::benchmark::State& state) {
arrow_reader->GetRecordBatchGenerator(arrow_reader, rgs, {0}));
auto fut = ::arrow::CollectAsyncGenerator(generator);
ASSIGN_OR_ABORT(auto batches, fut.result());
ASSIGN_OR_ABORT(auto actual, ::arrow::Table::FromRecordBatches(std::move(batches)));
ASSIGN_OR_ABORT(auto actual, Table::FromRecordBatches(std::move(batches)));
}
SetBytesProcessed<true, Int64Type>(state);
SetBytesProcessed<Int64Type>(state);
}

BENCHMARK(BM_ReadMultipleRowGroupsGenerator);
Expand Down

0 comments on commit 726e7de

Please sign in to comment.