Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-44081: [C++][Parquet] Fix reported metrics in parquet-arrow-reader-writer-benchmark #44082

Merged
merged 1 commit into from
Sep 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 56 additions & 39 deletions cpp/src/parquet/arrow/reader_writer_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <array>
#include <iostream>
#include <random>
#include <type_traits>

#include "parquet/arrow/reader.h"
#include "parquet/arrow/writer.h"
Expand All @@ -37,6 +38,7 @@
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/bitmap_ops.h"
#include "arrow/util/logging.h"

Expand All @@ -45,6 +47,7 @@ using arrow::ArrayVector;
using arrow::BooleanBuilder;
using arrow::FieldVector;
using arrow::NumericBuilder;
using arrow::Table;

#define EXIT_NOT_OK(s) \
do { \
Expand Down Expand Up @@ -104,13 +107,28 @@ std::shared_ptr<ColumnDescriptor> MakeSchema(Repetition::type repetition) {
repetition == Repetition::REPEATED);
}

template <bool nullable, typename ParquetType>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So nullable is unused previously?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, as you can see.

template <typename ParquetType>
int64_t BytesForItems(int64_t num_items) {
static_assert(!std::is_same_v<ParquetType, FLBAType>,
"BytesForItems unsupported for FLBAType");
return num_items * sizeof(typename ParquetType::c_type);
}

template <>
int64_t BytesForItems<BooleanType>(int64_t num_items) {
return ::arrow::bit_util::BytesForBits(num_items);
}

template <>
int64_t BytesForItems<Float16LogicalType>(int64_t num_items) {
return num_items * sizeof(uint16_t);
}

template <typename ParquetType>
void SetBytesProcessed(::benchmark::State& state, int64_t num_values = BENCHMARK_SIZE) {
const int64_t items_processed = state.iterations() * num_values;
const int64_t bytes_processed = items_processed * sizeof(typename ParquetType::c_type);

state.SetItemsProcessed(bytes_processed);
state.SetBytesProcessed(bytes_processed);
state.SetItemsProcessed(items_processed);
state.SetBytesProcessed(BytesForItems<ParquetType>(items_processed));
}

constexpr int64_t kAlternatingOrNa = -1;
Expand All @@ -132,9 +150,9 @@ std::vector<T> RandomVector(int64_t true_percentage, int64_t vector_size,
}

template <typename ParquetType, typename ArrowType = ArrowType<ParquetType>>
std::shared_ptr<::arrow::Table> TableFromVector(
const std::vector<typename ArrowType::c_type>& vec, bool nullable,
int64_t null_percentage = kAlternatingOrNa) {
std::shared_ptr<Table> TableFromVector(const std::vector<typename ArrowType::c_type>& vec,
bool nullable,
int64_t null_percentage = kAlternatingOrNa) {
if (!nullable) {
ARROW_CHECK_EQ(null_percentage, kAlternatingOrNa);
}
Expand All @@ -153,13 +171,12 @@ std::shared_ptr<::arrow::Table> TableFromVector(

auto field = ::arrow::field("column", type, nullable);
auto schema = ::arrow::schema({field});
return ::arrow::Table::Make(schema, {array});
return Table::Make(schema, {array});
}

template <>
std::shared_ptr<::arrow::Table> TableFromVector<BooleanType>(const std::vector<bool>& vec,
bool nullable,
int64_t null_percentage) {
std::shared_ptr<Table> TableFromVector<BooleanType, ::arrow::BooleanType>(
const std::vector<bool>& vec, bool nullable, int64_t null_percentage) {
BooleanBuilder builder;
if (nullable) {
auto valid_bytes = RandomVector<bool>(/*true_percentage=*/null_percentage, vec.size(),
Expand All @@ -174,21 +191,21 @@ std::shared_ptr<::arrow::Table> TableFromVector<BooleanType>(const std::vector<b
auto field = ::arrow::field("column", ::arrow::boolean(), nullable);
auto schema = std::make_shared<::arrow::Schema>(
std::vector<std::shared_ptr<::arrow::Field>>({field}));
return ::arrow::Table::Make(schema, {array});
return Table::Make(schema, {array});
}

template <bool nullable, typename ParquetType>
static void BM_WriteColumn(::benchmark::State& state) {
using T = typename ParquetType::c_type;
std::vector<T> values(BENCHMARK_SIZE, static_cast<T>(128));
std::shared_ptr<::arrow::Table> table = TableFromVector<ParquetType>(values, nullable);
std::shared_ptr<Table> table = TableFromVector<ParquetType>(values, nullable);

while (state.KeepRunning()) {
auto output = CreateOutputStream();
EXIT_NOT_OK(
WriteTable(*table, ::arrow::default_memory_pool(), output, BENCHMARK_SIZE));
}
SetBytesProcessed<nullable, ParquetType>(state);
SetBytesProcessed<ParquetType>(state);
}

BENCHMARK_TEMPLATE2(BM_WriteColumn, false, Int32Type);
Expand All @@ -205,8 +222,8 @@ BENCHMARK_TEMPLATE2(BM_WriteColumn, true, BooleanType);

int32_t kInfiniteUniqueValues = -1;

std::shared_ptr<::arrow::Table> RandomStringTable(int64_t length, int64_t unique_values,
int64_t null_percentage) {
std::shared_ptr<Table> RandomStringTable(int64_t length, int64_t unique_values,
int64_t null_percentage) {
std::shared_ptr<::arrow::DataType> type = ::arrow::utf8();
std::shared_ptr<::arrow::Array> arr;
::arrow::random::RandomArrayGenerator generator(/*seed=*/500);
Expand All @@ -219,12 +236,12 @@ std::shared_ptr<::arrow::Table> RandomStringTable(int64_t length, int64_t unique
/*min_length=*/3, /*max_length=*/32,
/*null_probability=*/null_probability);
}
return ::arrow::Table::Make(
return Table::Make(
::arrow::schema({::arrow::field("column", type, null_percentage > 0)}), {arr});
}

static void BM_WriteBinaryColumn(::benchmark::State& state) {
std::shared_ptr<::arrow::Table> table =
std::shared_ptr<Table> table =
RandomStringTable(BENCHMARK_SIZE, state.range(1), state.range(0));

while (state.KeepRunning()) {
Expand Down Expand Up @@ -263,7 +280,7 @@ struct Examples<bool> {
static constexpr std::array<bool, 2> values() { return {false, true}; }
};

static void BenchmarkReadTable(::benchmark::State& state, const ::arrow::Table& table,
static void BenchmarkReadTable(::benchmark::State& state, const Table& table,
std::shared_ptr<WriterProperties> properties,
int64_t num_values = -1, int64_t total_bytes = -1) {
auto output = CreateOutputStream();
Expand All @@ -278,7 +295,7 @@ static void BenchmarkReadTable(::benchmark::State& state, const ::arrow::Table&
EXIT_NOT_OK(FileReader::Make(::arrow::default_memory_pool(), std::move(reader),
&arrow_reader));

std::shared_ptr<::arrow::Table> table;
std::shared_ptr<Table> table;
EXIT_NOT_OK(arrow_reader->ReadTable(&table));
}

Expand All @@ -291,7 +308,7 @@ static void BenchmarkReadTable(::benchmark::State& state, const ::arrow::Table&
}
}

static void BenchmarkReadTable(::benchmark::State& state, const ::arrow::Table& table,
static void BenchmarkReadTable(::benchmark::State& state, const Table& table,
int64_t num_values = -1, int64_t total_bytes = -1) {
BenchmarkReadTable(state, table, default_writer_properties(), num_values, total_bytes);
}
Expand All @@ -301,7 +318,7 @@ static void BenchmarkReadArray(::benchmark::State& state,
std::shared_ptr<WriterProperties> properties,
int64_t num_values = -1, int64_t total_bytes = -1) {
auto schema = ::arrow::schema({field("s", array->type(), nullable)});
auto table = ::arrow::Table::Make(schema, {array}, array->length());
auto table = Table::Make(schema, {array}, array->length());

EXIT_NOT_OK(table->Validate());

Expand All @@ -326,13 +343,13 @@ static void BM_ReadColumn(::benchmark::State& state) {
auto values = RandomVector<T>(/*percentage=*/state.range(1), BENCHMARK_SIZE,
Examples<T>::values());

std::shared_ptr<::arrow::Table> table =
std::shared_ptr<Table> table =
TableFromVector<ParquetType>(values, nullable, state.range(0));

auto properties = WriterProperties::Builder().disable_dictionary()->build();

BenchmarkReadTable(state, *table, properties, table->num_rows(),
sizeof(typename ParquetType::c_type) * table->num_rows());
BytesForItems<ParquetType>(table->num_rows()));
}

// There are two parameters here that cover different data distributions.
Expand Down Expand Up @@ -403,12 +420,12 @@ static void BM_ReadColumnPlain(::benchmark::State& state) {
using c_type = typename ArrowType<ParquetType>::c_type;

const std::vector<c_type> values(BENCHMARK_SIZE, static_cast<c_type>(42));
std::shared_ptr<::arrow::Table> table =
std::shared_ptr<Table> table =
TableFromVector<ParquetType>(values, /*nullable=*/nullable, state.range(0));

auto properties = WriterProperties::Builder().disable_dictionary()->build();
BenchmarkReadTable(state, *table, properties, table->num_rows(),
sizeof(c_type) * table->num_rows());
BytesForItems<ParquetType>(table->num_rows()));
}

BENCHMARK_TEMPLATE2(BM_ReadColumnPlain, false, Int32Type)
Expand Down Expand Up @@ -438,7 +455,7 @@ BENCHMARK_TEMPLATE2(BM_ReadColumnPlain, true, Float16LogicalType)
//

static void BM_ReadBinaryColumn(::benchmark::State& state) {
std::shared_ptr<::arrow::Table> table =
std::shared_ptr<Table> table =
RandomStringTable(BENCHMARK_SIZE, state.range(1), state.range(0));

// Offsets + data
Expand Down Expand Up @@ -636,7 +653,7 @@ BENCHMARK(BM_ReadListOfListColumn)->Apply(NestedReadArguments);

static void BM_ReadIndividualRowGroups(::benchmark::State& state) {
std::vector<int64_t> values(BENCHMARK_SIZE, 128);
std::shared_ptr<::arrow::Table> table = TableFromVector<Int64Type>(values, true);
std::shared_ptr<Table> table = TableFromVector<Int64Type>(values, true);
auto output = CreateOutputStream();
// This writes 10 RowGroups
EXIT_NOT_OK(
Expand All @@ -651,27 +668,27 @@ static void BM_ReadIndividualRowGroups(::benchmark::State& state) {
EXIT_NOT_OK(FileReader::Make(::arrow::default_memory_pool(), std::move(reader),
&arrow_reader));

std::vector<std::shared_ptr<::arrow::Table>> tables;
std::vector<std::shared_ptr<Table>> tables;
for (int i = 0; i < arrow_reader->num_row_groups(); i++) {
// Only read the even numbered RowGroups
if ((i % 2) == 0) {
std::shared_ptr<::arrow::Table> table;
std::shared_ptr<Table> table;
EXIT_NOT_OK(arrow_reader->RowGroup(i)->ReadTable(&table));
tables.push_back(table);
}
}

std::shared_ptr<::arrow::Table> final_table;
std::shared_ptr<Table> final_table;
PARQUET_ASSIGN_OR_THROW(final_table, ConcatenateTables(tables));
}
SetBytesProcessed<true, Int64Type>(state);
SetBytesProcessed<Int64Type>(state);
}

BENCHMARK(BM_ReadIndividualRowGroups);

static void BM_ReadMultipleRowGroups(::benchmark::State& state) {
std::vector<int64_t> values(BENCHMARK_SIZE, 128);
std::shared_ptr<::arrow::Table> table = TableFromVector<Int64Type>(values, true);
std::shared_ptr<Table> table = TableFromVector<Int64Type>(values, true);
auto output = CreateOutputStream();
// This writes 10 RowGroups
EXIT_NOT_OK(
Expand All @@ -685,17 +702,17 @@ static void BM_ReadMultipleRowGroups(::benchmark::State& state) {
std::unique_ptr<FileReader> arrow_reader;
EXIT_NOT_OK(FileReader::Make(::arrow::default_memory_pool(), std::move(reader),
&arrow_reader));
std::shared_ptr<::arrow::Table> table;
std::shared_ptr<Table> table;
EXIT_NOT_OK(arrow_reader->ReadRowGroups(rgs, &table));
}
SetBytesProcessed<true, Int64Type>(state);
SetBytesProcessed<Int64Type>(state);
}

BENCHMARK(BM_ReadMultipleRowGroups);

static void BM_ReadMultipleRowGroupsGenerator(::benchmark::State& state) {
std::vector<int64_t> values(BENCHMARK_SIZE, 128);
std::shared_ptr<::arrow::Table> table = TableFromVector<Int64Type>(values, true);
std::shared_ptr<Table> table = TableFromVector<Int64Type>(values, true);
auto output = CreateOutputStream();
// This writes 10 RowGroups
EXIT_NOT_OK(
Expand All @@ -714,9 +731,9 @@ static void BM_ReadMultipleRowGroupsGenerator(::benchmark::State& state) {
arrow_reader->GetRecordBatchGenerator(arrow_reader, rgs, {0}));
auto fut = ::arrow::CollectAsyncGenerator(generator);
ASSIGN_OR_ABORT(auto batches, fut.result());
ASSIGN_OR_ABORT(auto actual, ::arrow::Table::FromRecordBatches(std::move(batches)));
ASSIGN_OR_ABORT(auto actual, Table::FromRecordBatches(std::move(batches)));
}
SetBytesProcessed<true, Int64Type>(state);
SetBytesProcessed<Int64Type>(state);
}

BENCHMARK(BM_ReadMultipleRowGroupsGenerator);
Expand Down
Loading