Skip to content

Commit

Permalink
[Datasets] Add benchmark for read_parquet() API (#31483)
Browse files Browse the repository at this point in the history
The read_parquet is one of the most common used APIs to load Dataset. We should add benchmark tests to make sure it works efficiently.

Signed-off-by: jianoaix <iamjianxiao@gmail.com>
  • Loading branch information
jianoaix authored Jan 10, 2023
1 parent d970332 commit 2182af1
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 0 deletions.
108 changes: 108 additions & 0 deletions release/nightly_tests/dataset/parquet_data_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import os

import pandas as pd
import numpy as np

import ray

#
# Data generation utilities for the parquet files.
#


def generate_data(
num_rows: int,
num_files: int,
num_row_groups_per_file: int,
compression: str,
data_dir: str,
):
results = []
for file_index, global_row_index in enumerate(
range(0, num_rows, num_rows // num_files)
):
num_rows_in_file = min(num_rows // num_files, num_rows - global_row_index)
results.append(
generate_file.remote(
file_index,
global_row_index,
num_rows_in_file,
num_row_groups_per_file,
compression,
data_dir,
)
)
filenames, data_sizes = zip(*ray.get(results))
return filenames, sum(data_sizes)


@ray.remote
def generate_file(
file_index: int,
global_row_index: int,
num_rows_in_file: int,
num_row_groups_per_file: int,
compression: str,
data_dir: str,
):
buffs = []
for group_index, group_global_row_index in enumerate(
range(0, num_rows_in_file, num_rows_in_file // num_row_groups_per_file)
):
num_rows_in_group = min(
num_rows_in_file // num_row_groups_per_file,
num_rows_in_file - group_global_row_index,
)
buffs.append(
generate_row_group(group_index, group_global_row_index, num_rows_in_group)
)
df = pd.concat(buffs)
data_size = df.memory_usage(deep=True).sum()
filename = os.path.join(data_dir, f"input_data_{file_index}.parquet.snappy")
df.to_parquet(
filename,
compression=compression,
row_group_size=num_rows_in_file // num_row_groups_per_file,
)
return filename, data_size


# TODO(jian): Enhance the parquet content:
# 1) supports more data types;
# 2) supports data skews.

DATA_SPEC = {
"embeddings_name0": (0, 2385, np.int64),
"embeddings_name1": (0, 201, np.int64),
"embeddings_name2": (0, 201, np.int64),
"embeddings_name3": (0, 6, np.int64),
"embeddings_name4": (0, 19, np.int64),
"embeddings_name5": (0, 1441, np.int64),
"embeddings_name6": (0, 201, np.int64),
"embeddings_name7": (0, 22, np.int64),
"embeddings_name8": (0, 156, np.int64),
"embeddings_name9": (0, 1216, np.int64),
"embeddings_name10": (0, 9216, np.int64),
"embeddings_name11": (0, 88999, np.int64),
"embeddings_name12": (0, 941792, np.int64),
"embeddings_name13": (0, 9405, np.int64),
"embeddings_name14": (0, 83332, np.int64),
"embeddings_name15": (0, 828767, np.int64),
"embeddings_name16": (0, 945195, np.int64),
"one_hot0": (0, 3, np.int64),
"one_hot1": (0, 50, np.int64),
"labels": (0, 1, np.float64),
}


def generate_row_group(group_index: int, global_row_index: int, num_rows_in_group: int):
buffer = {
"key": np.array(range(global_row_index, global_row_index + num_rows_in_group)),
}
for col, (low, high, dtype) in DATA_SPEC.items():
if dtype in (np.int16, np.int32, np.int64):
buffer[col] = np.random.randint(low, high, num_rows_in_group, dtype=dtype)
elif dtype in (np.float32, np.float64):
buffer[col] = (high - low) * np.random.rand(num_rows_in_group) + low

return pd.DataFrame(buffer)
90 changes: 90 additions & 0 deletions release/nightly_tests/dataset/read_parquet_benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import pyarrow as pa

import ray
from ray.data.dataset import Dataset

from benchmark import Benchmark
from parquet_data_generator import generate_data

import shutil
import tempfile


def read_parquet(
root: str,
parallelism: int = -1,
use_threads: bool = False,
filter=None,
columns=None,
) -> Dataset:
return ray.data.read_parquet(
paths=root,
parallelism=parallelism,
use_threads=use_threads,
filter=filter,
columns=columns,
).fully_executed()


def run_read_parquet_benchmark(benchmark: Benchmark):
# Test with different parallelism (multi-processing for single node) and threading.
for parallelism in [1, 2, 4]:
for use_threads in [True, False]:
test_name = f"read-parquet-downsampled-nyc-taxi-2009-{parallelism}-{use_threads}" # noqa: E501
benchmark.run(
test_name,
read_parquet,
root="s3://anonymous@air-example-data/ursa-labs-taxi-data/downsampled_2009_full_year_data.parquet", # noqa: E501
parallelism=parallelism,
use_threads=use_threads,
)

# Test with projection and filter pushdowns.
# Since we have projection and filter pushdown, we can run the read on the full
# size of one year data fast enough on a single node.
test_name = "read-parquet-nyc-taxi-2018-pushdown"
filter_expr = (pa.dataset.field("passenger_count") <= 10) & (
pa.dataset.field("passenger_count") > 0
)
benchmark.run(
test_name,
read_parquet,
root="s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2018",
columns=["passenger_count", "trip_distance"],
filter=filter_expr,
)

# Test with different number files to handle: from a few to many.
data_dirs = []
# Each test set has same total number of rows, which are distributed
# to different number of files.
total_rows = 1024 * 1024 * 8
for num_files in [8, 128, 1024]:
for compression in ["snappy", "gzip"]:
data_dirs.append(tempfile.mkdtemp())
generate_data(
num_rows=total_rows,
num_files=num_files,
num_row_groups_per_file=16,
compression=compression,
data_dir=data_dirs[-1],
)
test_name = f"read-parquet-random-data-{num_files}-{compression}"
benchmark.run(
test_name,
read_parquet,
root=data_dirs[-1],
parallelism=1, # We are testing one task to handle N files
)
for dir in data_dirs:
shutil.rmtree(dir)


if __name__ == "__main__":
ray.init()

benchmark = Benchmark("read-parquet")

run_read_parquet_benchmark(benchmark)

benchmark.write_result()
18 changes: 18 additions & 0 deletions release/release_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4287,6 +4287,24 @@
script: python aggregate_benchmark.py


- name: read_parquet_benchmark_single_node
group: core-dataset-tests
working_dir: nightly_tests/dataset

frequency: nightly
team: data
cluster:
cluster_env: app_config.yaml
cluster_compute: single_node_benchmark_compute.yaml

run:
# Expect the benchmark to finish less than 4 mins.
timeout: 300
script: python read_parquet_benchmark.py

type: sdk_command
file_manager: sdk

- name: read_images_benchmark_single_node
group: core-dataset-tests
working_dir: nightly_tests/dataset
Expand Down

0 comments on commit 2182af1

Please sign in to comment.