Skip to content

Commit

Permalink
Improve the test data for pylibcudf I/O tests (#16247)
Browse files Browse the repository at this point in the history
Don't just use random integers for every data type.

Decided not to use hypothesis since I don't think there's a good way to re-use the table across calls
(and I would like to keep the runtime of pylibcudf tests down).

Authors:
  - Thomas Li (https://github.com/lithomas1)

Approvers:
  - https://github.com/brandon-b-miller

URL: #16247
  • Loading branch information
lithomas1 authored Jul 12, 2024
1 parent 30e3209 commit 1ff7461
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 71 deletions.
40 changes: 40 additions & 0 deletions python/cudf/cudf/pylibcudf_tests/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,21 @@ def is_nested_list(typ):
return nesting_level(typ)[0] > 1


def write_source_str(source, input_str):
"""
Write a string to the source
(useful for testing CSV/JSON I/O)
"""
if not isinstance(source, io.IOBase):
with open(source, "w") as source_f:
source_f.write(input_str)
else:
if isinstance(source, io.BytesIO):
input_str = input_str.encode("utf-8")
source.write(input_str)
source.seek(0)


def sink_to_str(sink):
"""
Takes a sink (e.g. StringIO/BytesIO, filepath, etc.)
Expand All @@ -192,6 +207,31 @@ def sink_to_str(sink):
return str_result


def make_source(path_or_buf, pa_table, format, **kwargs):
"""
Write a pyarrow Table to a specific format using pandas
by dispatching to the appropriate to_* call.
The caller is responsible for making sure that no arguments
unsupported by pandas are passed in.
"""
df = pa_table.to_pandas()
mode = "w"
if "compression" in kwargs:
kwargs["compression"] = COMPRESSION_TYPE_TO_PANDAS[
kwargs["compression"]
]
if kwargs["compression"] is not None and format != "json":
# pandas json method only supports mode="w"/"a"
mode = "wb"
if format == "json":
df.to_json(path_or_buf, mode=mode, **kwargs)
elif format == "csv":
df.to_csv(path_or_buf, mode=mode, **kwargs)
if isinstance(path_or_buf, io.IOBase):
path_or_buf.seek(0)
return path_or_buf


NUMERIC_PA_TYPES = [pa.int64(), pa.float64(), pa.uint64()]
STRING_PA_TYPES = [pa.string()]
BOOL_PA_TYPES = [pa.bool_()]
Expand Down
70 changes: 66 additions & 4 deletions python/cudf/cudf/pylibcudf_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import pytest

import cudf._lib.pylibcudf as plc
from cudf._lib.pylibcudf.io.types import CompressionType

sys.path.insert(0, os.path.join(os.path.dirname(__file__), "common"))

Expand All @@ -37,6 +38,37 @@ def numeric_pa_type(request):
return request.param


def _get_vals_of_type(pa_type, length, seed):
"""
Returns an list-like of random values of that type
"""
rng = np.random.default_rng(seed=seed)
if pa_type == pa.int64():
half = length // 2
negs = rng.integers(-length, 0, half, dtype=np.int64)
pos = rng.integers(0, length, length - half, dtype=np.int64)
return np.concatenate([negs, pos])
elif pa_type == pa.uint64():
return rng.integers(0, length, length, dtype=np.uint64)
elif pa_type == pa.float64():
# Round to 6 decimal places or else we have problems comparing our
# output to pandas due to floating point/rounding differences
return rng.uniform(-length, length, length).round(6)
elif pa_type == pa.bool_():
return rng.integers(0, 2, length, dtype=bool)
elif pa_type == pa.string():
# Generate random ASCII strings
strs = []
for _ in range(length):
chrs = rng.integers(33, 128, length)
strs.append("".join(chr(x) for x in chrs))
return strs
else:
raise NotImplementedError(
f"random data generation not implemented for {pa_type}"
)


# TODO: Consider adding another fixture/adapting this
# fixture to consider nullability
@pytest.fixture(scope="session", params=[0, 100])
Expand All @@ -57,10 +89,9 @@ def table_data(request):
# plc.io.TableWithMetadata
colnames = []

np.random.seed(42)
seed = 42

for typ in ALL_PA_TYPES:
rand_vals = np.random.randint(0, nrows, nrows)
child_colnames = []

def _generate_nested_data(typ):
Expand Down Expand Up @@ -88,13 +119,17 @@ def _generate_nested_data(typ):
child_colnames.append(("", grandchild_colnames))
else:
# typ is scalar type
pa_array = pa.array(rand_vals).cast(typ)
pa_array = pa.array(
_get_vals_of_type(typ, nrows, seed=seed), type=typ
)
return pa_array, child_colnames

if isinstance(typ, (pa.ListType, pa.StructType)):
rand_arr, child_colnames = _generate_nested_data(typ)
else:
rand_arr = pa.array(rand_vals).cast(typ)
rand_arr = pa.array(
_get_vals_of_type(typ, nrows, seed=seed), type=typ
)

table_dict[f"col_{typ}"] = rand_arr
colnames.append((f"col_{typ}", child_colnames))
Expand All @@ -121,6 +156,33 @@ def source_or_sink(request, tmp_path):
return fp_or_buf()


unsupported_types = {
# Not supported by pandas
# TODO: find a way to test these
CompressionType.SNAPPY,
CompressionType.BROTLI,
CompressionType.LZ4,
CompressionType.LZO,
CompressionType.ZLIB,
}

unsupported_text_compression_types = unsupported_types.union(
{
# compressions not supported by libcudf
# for csv/json
CompressionType.XZ,
CompressionType.ZSTD,
}
)


@pytest.fixture(
params=set(CompressionType).difference(unsupported_text_compression_types)
)
def text_compression_type(request):
return request.param


@pytest.fixture(params=[opt for opt in plc.io.types.CompressionType])
def compression_type(request):
return request.param
Expand Down
85 changes: 18 additions & 67 deletions python/cudf/cudf/pylibcudf_tests/io/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,45 +5,17 @@
import pyarrow as pa
import pytest
from utils import (
COMPRESSION_TYPE_TO_PANDAS,
assert_table_and_meta_eq,
make_source,
sink_to_str,
write_source_str,
)

import cudf._lib.pylibcudf as plc
from cudf._lib.pylibcudf.io.types import CompressionType


def make_json_source(path_or_buf, pa_table, **kwargs):
"""
Uses pandas to write a pyarrow Table to a JSON file.
The caller is responsible for making sure that no arguments
unsupported by pandas are passed in.
"""
df = pa_table.to_pandas()
if "compression" in kwargs:
kwargs["compression"] = COMPRESSION_TYPE_TO_PANDAS[
kwargs["compression"]
]
df.to_json(path_or_buf, orient="records", **kwargs)
if isinstance(path_or_buf, io.IOBase):
path_or_buf.seek(0)
return path_or_buf


def write_json_bytes(source, json_str):
"""
Write a JSON string to the source
"""
if not isinstance(source, io.IOBase):
with open(source, "w") as source_f:
source_f.write(json_str)
else:
if isinstance(source, io.BytesIO):
json_str = json_str.encode("utf-8")
source.write(json_str)
source.seek(0)
# Shared kwargs to pass to make_source
_COMMON_JSON_SOURCE_KWARGS = {"format": "json", "orient": "records"}


@pytest.mark.parametrize("rows_per_chunk", [8, 100])
Expand Down Expand Up @@ -156,44 +128,22 @@ def test_write_json_bool_opts(true_value, false_value):

@pytest.mark.parametrize("lines", [True, False])
def test_read_json_basic(
table_data, source_or_sink, lines, compression_type, request
table_data, source_or_sink, lines, text_compression_type
):
if compression_type in {
# Not supported by libcudf
CompressionType.SNAPPY,
CompressionType.XZ,
CompressionType.ZSTD,
# Not supported by pandas
# TODO: find a way to test these
CompressionType.BROTLI,
CompressionType.LZ4,
CompressionType.LZO,
CompressionType.ZLIB,
}:
pytest.skip("unsupported compression type by pandas/libcudf")
compression_type = text_compression_type

# can't compress non-binary data with pandas
if isinstance(source_or_sink, io.StringIO):
compression_type = CompressionType.NONE

_, pa_table = table_data

source = make_json_source(
source_or_sink, pa_table, lines=lines, compression=compression_type
)

request.applymarker(
pytest.mark.xfail(
condition=(
len(pa_table) > 0
and compression_type
not in {CompressionType.NONE, CompressionType.AUTO}
),
# note: wasn't able to narrow down the specific types that were failing
# seems to be a little non-deterministic, but always fails with
# cudaErrorInvalidValue invalid argument
reason="libcudf json reader crashes on compressed non empty table_data",
)
source = make_source(
source_or_sink,
pa_table,
lines=lines,
compression=compression_type,
**_COMMON_JSON_SOURCE_KWARGS,
)

if isinstance(source, io.IOBase):
Expand Down Expand Up @@ -237,10 +187,11 @@ def test_read_json_dtypes(table_data, source_or_sink):
# Simple test for dtypes where we read in
# all numeric data as floats
_, pa_table = table_data
source = make_json_source(
source = make_source(
source_or_sink,
pa_table,
lines=True,
**_COMMON_JSON_SOURCE_KWARGS,
)

dtypes = []
Expand Down Expand Up @@ -295,7 +246,7 @@ def test_read_json_lines_byte_range(source_or_sink, chunk_size):
pytest.skip("byte_range doesn't work on StringIO")

json_str = "[1, 2, 3]\n[4, 5, 6]\n[7, 8, 9]\n"
write_json_bytes(source, json_str)
write_source_str(source, json_str)

tbls_w_meta = []
for chunk_start in range(0, len(json_str.encode("utf-8")), chunk_size):
Expand Down Expand Up @@ -331,7 +282,7 @@ def test_read_json_lines_keep_quotes(keep_quotes, source_or_sink):
source = source_or_sink

json_bytes = '["a", "b", "c"]\n'
write_json_bytes(source, json_bytes)
write_source_str(source, json_bytes)

tbl_w_meta = plc.io.json.read_json(
plc.io.SourceInfo([source]), lines=True, keep_quotes=keep_quotes
Expand Down Expand Up @@ -359,8 +310,8 @@ def test_read_json_lines_keep_quotes(keep_quotes, source_or_sink):
def test_read_json_lines_recovery_mode(recovery_mode, source_or_sink):
source = source_or_sink

json_bytes = '{"a":1,"b":10}\n{"a":2,"b":11}\nabc\n{"a":3,"b":12}\n'
write_json_bytes(source, json_bytes)
json_str = '{"a":1,"b":10}\n{"a":2,"b":11}\nabc\n{"a":3,"b":12}\n'
write_source_str(source, json_str)

if recovery_mode == plc.io.types.JSONRecoveryMode.FAIL:
with pytest.raises(RuntimeError):
Expand Down

0 comments on commit 1ff7461

Please sign in to comment.