Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the test data for pylibcudf I/O tests #16247

Merged
merged 3 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions python/cudf/cudf/pylibcudf_tests/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,21 @@ def is_nested_list(typ):
return nesting_level(typ)[0] > 1


def write_source_str(source, input_str):
"""
Write a string to the source
(useful for testing CSV/JSON I/O)
"""
if not isinstance(source, io.IOBase):
with open(source, "w") as source_f:
source_f.write(input_str)
else:
if isinstance(source, io.BytesIO):
input_str = input_str.encode("utf-8")
source.write(input_str)
source.seek(0)


def sink_to_str(sink):
"""
Takes a sink (e.g. StringIO/BytesIO, filepath, etc.)
Expand All @@ -192,6 +207,31 @@ def sink_to_str(sink):
return str_result


def make_source(path_or_buf, pa_table, format, **kwargs):
"""
Write a pyarrow Table to a specific format using pandas
by dispatching to the appropriate to_* call.
The caller is responsible for making sure that no arguments
unsupported by pandas are passed in.
"""
df = pa_table.to_pandas()
mode = "w"
if "compression" in kwargs:
kwargs["compression"] = COMPRESSION_TYPE_TO_PANDAS[
kwargs["compression"]
]
if kwargs["compression"] is not None and format != "json":
# pandas json method only supports mode="w"/"a"
mode = "wb"
if format == "json":
df.to_json(path_or_buf, mode=mode, **kwargs)
elif format == "csv":
df.to_csv(path_or_buf, mode=mode, **kwargs)
if isinstance(path_or_buf, io.IOBase):
path_or_buf.seek(0)
return path_or_buf


NUMERIC_PA_TYPES = [pa.int64(), pa.float64(), pa.uint64()]
STRING_PA_TYPES = [pa.string()]
BOOL_PA_TYPES = [pa.bool_()]
Expand Down
70 changes: 66 additions & 4 deletions python/cudf/cudf/pylibcudf_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import pytest

import cudf._lib.pylibcudf as plc
from cudf._lib.pylibcudf.io.types import CompressionType

sys.path.insert(0, os.path.join(os.path.dirname(__file__), "common"))

Expand All @@ -37,6 +38,37 @@ def numeric_pa_type(request):
return request.param


def _get_vals_of_type(pa_type, length, seed):
"""
Returns an list-like of random values of that type
"""
rng = np.random.default_rng(seed=seed)
if pa_type == pa.int64():
half = length // 2
negs = rng.integers(-length, 0, half, dtype=np.int64)
pos = rng.integers(0, length, length - half, dtype=np.int64)
return np.concatenate([negs, pos])
elif pa_type == pa.uint64():
return rng.integers(0, length, length, dtype=np.uint64)
elif pa_type == pa.float64():
# Round to 6 decimal places or else we have problems comparing our
# output to pandas due to floating point/rounding differences
return rng.uniform(-length, length, length).round(6)
elif pa_type == pa.bool_():
return rng.integers(0, 2, length, dtype=bool)
elif pa_type == pa.string():
# Generate random ASCII strings
strs = []
for _ in range(length):
chrs = rng.integers(33, 128, length)
strs.append("".join(chr(x) for x in chrs))
return strs
else:
raise NotImplementedError(
f"random data generation not implemented for {pa_type}"
)


# TODO: Consider adding another fixture/adapting this
# fixture to consider nullability
@pytest.fixture(scope="session", params=[0, 100])
Expand All @@ -57,10 +89,9 @@ def table_data(request):
# plc.io.TableWithMetadata
colnames = []

np.random.seed(42)
seed = 42

for typ in ALL_PA_TYPES:
rand_vals = np.random.randint(0, nrows, nrows)
child_colnames = []

def _generate_nested_data(typ):
Expand Down Expand Up @@ -88,13 +119,17 @@ def _generate_nested_data(typ):
child_colnames.append(("", grandchild_colnames))
else:
# typ is scalar type
pa_array = pa.array(rand_vals).cast(typ)
pa_array = pa.array(
_get_vals_of_type(typ, nrows, seed=seed), type=typ
)
return pa_array, child_colnames

if isinstance(typ, (pa.ListType, pa.StructType)):
rand_arr, child_colnames = _generate_nested_data(typ)
else:
rand_arr = pa.array(rand_vals).cast(typ)
rand_arr = pa.array(
_get_vals_of_type(typ, nrows, seed=seed), type=typ
)

table_dict[f"col_{typ}"] = rand_arr
colnames.append((f"col_{typ}", child_colnames))
Expand All @@ -121,6 +156,33 @@ def source_or_sink(request, tmp_path):
return fp_or_buf()


unsupported_types = {
# Not supported by pandas
# TODO: find a way to test these
CompressionType.SNAPPY,
CompressionType.BROTLI,
CompressionType.LZ4,
CompressionType.LZO,
CompressionType.ZLIB,
}

unsupported_text_compression_types = unsupported_types.union(
{
# compressions not supported by libcudf
# for csv/json
CompressionType.XZ,
CompressionType.ZSTD,
}
)


@pytest.fixture(
params=set(CompressionType).difference(unsupported_text_compression_types)
)
def text_compression_type(request):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this since most formats don't support all of the compression types, so it probably makes sense to break it out into text (CSV/JSON) vs binary (ORC/Parquet)

return request.param


@pytest.fixture(params=[opt for opt in plc.io.types.CompressionType])
def compression_type(request):
return request.param
Expand Down
85 changes: 18 additions & 67 deletions python/cudf/cudf/pylibcudf_tests/io/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,45 +5,17 @@
import pyarrow as pa
import pytest
from utils import (
COMPRESSION_TYPE_TO_PANDAS,
assert_table_and_meta_eq,
make_source,
sink_to_str,
write_source_str,
)

import cudf._lib.pylibcudf as plc
from cudf._lib.pylibcudf.io.types import CompressionType


def make_json_source(path_or_buf, pa_table, **kwargs):
"""
Uses pandas to write a pyarrow Table to a JSON file.

The caller is responsible for making sure that no arguments
unsupported by pandas are passed in.
"""
df = pa_table.to_pandas()
if "compression" in kwargs:
kwargs["compression"] = COMPRESSION_TYPE_TO_PANDAS[
kwargs["compression"]
]
df.to_json(path_or_buf, orient="records", **kwargs)
if isinstance(path_or_buf, io.IOBase):
path_or_buf.seek(0)
return path_or_buf


def write_json_bytes(source, json_str):
"""
Write a JSON string to the source
"""
if not isinstance(source, io.IOBase):
with open(source, "w") as source_f:
source_f.write(json_str)
else:
if isinstance(source, io.BytesIO):
json_str = json_str.encode("utf-8")
source.write(json_str)
source.seek(0)
# Shared kwargs to pass to make_source
_COMMON_JSON_SOURCE_KWARGS = {"format": "json", "orient": "records"}


@pytest.mark.parametrize("rows_per_chunk", [8, 100])
Expand Down Expand Up @@ -156,44 +128,22 @@ def test_write_json_bool_opts(true_value, false_value):

@pytest.mark.parametrize("lines", [True, False])
def test_read_json_basic(
table_data, source_or_sink, lines, compression_type, request
table_data, source_or_sink, lines, text_compression_type
):
if compression_type in {
# Not supported by libcudf
CompressionType.SNAPPY,
CompressionType.XZ,
CompressionType.ZSTD,
# Not supported by pandas
# TODO: find a way to test these
CompressionType.BROTLI,
CompressionType.LZ4,
CompressionType.LZO,
CompressionType.ZLIB,
}:
pytest.skip("unsupported compression type by pandas/libcudf")
compression_type = text_compression_type

# can't compress non-binary data with pandas
if isinstance(source_or_sink, io.StringIO):
compression_type = CompressionType.NONE

_, pa_table = table_data

source = make_json_source(
source_or_sink, pa_table, lines=lines, compression=compression_type
)

request.applymarker(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that the data is more well-formed I guess, this isn't crashing anymore.

I think there still might be an issue in libcudf's JSON reader, though.
(will open a followup issue if I can still reproduce, but it's a little hard to reproduce)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pytest.mark.xfail(
condition=(
len(pa_table) > 0
and compression_type
not in {CompressionType.NONE, CompressionType.AUTO}
),
# note: wasn't able to narrow down the specific types that were failing
# seems to be a little non-deterministic, but always fails with
# cudaErrorInvalidValue invalid argument
reason="libcudf json reader crashes on compressed non empty table_data",
)
source = make_source(
source_or_sink,
pa_table,
lines=lines,
compression=compression_type,
**_COMMON_JSON_SOURCE_KWARGS,
)

if isinstance(source, io.IOBase):
Expand Down Expand Up @@ -237,10 +187,11 @@ def test_read_json_dtypes(table_data, source_or_sink):
# Simple test for dtypes where we read in
# all numeric data as floats
_, pa_table = table_data
source = make_json_source(
source = make_source(
source_or_sink,
pa_table,
lines=True,
**_COMMON_JSON_SOURCE_KWARGS,
)

dtypes = []
Expand Down Expand Up @@ -295,7 +246,7 @@ def test_read_json_lines_byte_range(source_or_sink, chunk_size):
pytest.skip("byte_range doesn't work on StringIO")

json_str = "[1, 2, 3]\n[4, 5, 6]\n[7, 8, 9]\n"
write_json_bytes(source, json_str)
write_source_str(source, json_str)

tbls_w_meta = []
for chunk_start in range(0, len(json_str.encode("utf-8")), chunk_size):
Expand Down Expand Up @@ -331,7 +282,7 @@ def test_read_json_lines_keep_quotes(keep_quotes, source_or_sink):
source = source_or_sink

json_bytes = '["a", "b", "c"]\n'
write_json_bytes(source, json_bytes)
write_source_str(source, json_bytes)

tbl_w_meta = plc.io.json.read_json(
plc.io.SourceInfo([source]), lines=True, keep_quotes=keep_quotes
Expand Down Expand Up @@ -359,8 +310,8 @@ def test_read_json_lines_keep_quotes(keep_quotes, source_or_sink):
def test_read_json_lines_recovery_mode(recovery_mode, source_or_sink):
source = source_or_sink

json_bytes = '{"a":1,"b":10}\n{"a":2,"b":11}\nabc\n{"a":3,"b":12}\n'
write_json_bytes(source, json_bytes)
json_str = '{"a":1,"b":10}\n{"a":2,"b":11}\nabc\n{"a":3,"b":12}\n'
write_source_str(source, json_str)

if recovery_mode == plc.io.types.JSONRecoveryMode.FAIL:
with pytest.raises(RuntimeError):
Expand Down
Loading