From 591cdd2a71fd2523ba3dc5d5183ddca695243d50 Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Thu, 6 Jun 2024 23:54:58 +0000 Subject: [PATCH] Start migrating I/O writers to pylibcudf (starting with JSON) --- python/cudf/cudf/_lib/json.pyx | 101 +++++---------- .../cudf/_lib/pylibcudf/io/CMakeLists.txt | 4 +- .../cudf/cudf/_lib/pylibcudf/io/__init__.pxd | 4 +- .../cudf/cudf/_lib/pylibcudf/io/__init__.py | 4 +- python/cudf/cudf/_lib/pylibcudf/io/json.pxd | 19 +++ python/cudf/cudf/_lib/pylibcudf/io/json.pyx | 47 +++++++ python/cudf/cudf/_lib/pylibcudf/io/types.pxd | 11 ++ python/cudf/cudf/_lib/pylibcudf/io/types.pyx | 121 ++++++++++++++++++ .../cudf/_lib/pylibcudf/libcudf/io/types.pxd | 1 - 9 files changed, 236 insertions(+), 76 deletions(-) create mode 100644 python/cudf/cudf/_lib/pylibcudf/io/json.pxd create mode 100644 python/cudf/cudf/_lib/pylibcudf/io/json.pyx diff --git a/python/cudf/cudf/_lib/json.pyx b/python/cudf/cudf/_lib/json.pyx index a8fef907bad..26ee1dc4554 100644 --- a/python/cudf/cudf/_lib/json.pyx +++ b/python/cudf/cudf/_lib/json.pyx @@ -9,38 +9,27 @@ from cudf.core.buffer import acquire_spill_lock from libcpp cimport bool from libcpp.map cimport map -from libcpp.memory cimport unique_ptr from libcpp.string cimport string from libcpp.utility cimport move from libcpp.vector cimport vector cimport cudf._lib.pylibcudf.libcudf.io.types as cudf_io_types -from cudf._lib.column cimport Column -from cudf._lib.io.utils cimport ( - make_sink_info, - make_source_info, - update_struct_field_names, -) -from cudf._lib.pylibcudf.libcudf.io.data_sink cimport data_sink +from cudf._lib.io.utils cimport make_source_info, update_struct_field_names from cudf._lib.pylibcudf.libcudf.io.json cimport ( json_reader_options, json_recovery_mode_t, - json_writer_options, read_json as libcudf_read_json, schema_element, - write_json as libcudf_write_json, ) from cudf._lib.pylibcudf.libcudf.io.types cimport ( - column_name_info, compression_type, - sink_info, - table_metadata, table_with_metadata, ) -from cudf._lib.pylibcudf.libcudf.table.table_view cimport table_view from cudf._lib.pylibcudf.libcudf.types cimport data_type, size_type from cudf._lib.types cimport dtype_to_data_type -from cudf._lib.utils cimport data_from_unique_ptr, table_view_from_table +from cudf._lib.utils cimport data_from_unique_ptr + +import cudf._lib.pylibcudf as plc cdef json_recovery_mode_t _get_json_recovery_mode(object on_bad_lines): @@ -175,45 +164,27 @@ def write_json( -------- cudf.to_json """ - cdef table_view input_table_view = table_view_from_table( - table, ignore_index=True - ) + cdef list colnames = [] - cdef unique_ptr[data_sink] data_sink_c - cdef sink_info sink_info_c = make_sink_info(path_or_buf, data_sink_c) - cdef string na_c = na_rep.encode() - cdef bool include_nulls_c = include_nulls - cdef bool lines_c = lines - cdef int rows_per_chunk_c = rows_per_chunk - cdef string true_value_c = 'true'.encode() - cdef string false_value_c = 'false'.encode() - cdef table_metadata tbl_meta - - num_index_cols_meta = 0 - cdef column_name_info child_info - for i, name in enumerate(table._column_names, num_index_cols_meta): - child_info.name = name.encode() - tbl_meta.schema_info.push_back(child_info) - _set_col_children_metadata( - table[name]._column, - tbl_meta.schema_info[i] - ) - - cdef json_writer_options options = move( - json_writer_options.builder(sink_info_c, input_table_view) - .metadata(tbl_meta) - .na_rep(na_c) - .include_nulls(include_nulls_c) - .lines(lines_c) - .rows_per_chunk(rows_per_chunk_c) - .true_value(true_value_c) - .false_value(false_value_c) - .build() - ) + for name in table._column_names: + colnames.append((name, _dtype_to_names_list(table[name]._column))) try: - with nogil: - libcudf_write_json(options) + plc.io.json.write_json( + plc.io.SinkInfo([path_or_buf]), + plc.io.TableWithMetadata( + plc.Table([ + c.to_pylibcudf(mode="read") for c in table._data.columns + ]), + colnames + ), + na_rep, + include_nulls, + lines, + rows_per_chunk, + true_value="true", + false_value="false" + ) except OverflowError: raise OverflowError( f"Writing JSON file with rows_per_chunk={rows_per_chunk} failed. " @@ -254,23 +225,15 @@ cdef data_type _get_cudf_data_type_from_dtype(object dtype) except *: ) return dtype_to_data_type(dtype) -cdef _set_col_children_metadata(Column col, - column_name_info& col_meta): - cdef column_name_info child_info + +def _dtype_to_names_list(col): + cdef list child_names = [] if isinstance(col.dtype, cudf.StructDtype): - for i, (child_col, name) in enumerate( - zip(col.children, list(col.dtype.fields)) - ): - child_info.name = name.encode() - col_meta.children.push_back(child_info) - _set_col_children_metadata( - child_col, col_meta.children[i] - ) + for child_col, name in zip(col.children, list(col.dtype.fields)): + child_names.append((name, _dtype_to_names_list(child_col))) elif isinstance(col.dtype, cudf.ListDtype): - for i, child_col in enumerate(col.children): - col_meta.children.push_back(child_info) - _set_col_children_metadata( - child_col, col_meta.children[i] - ) - else: - return + for child_col in col.children: + list_child_names = _dtype_to_names_list(child_col) + child_names.append(("", list_child_names)) + + return child_names diff --git a/python/cudf/cudf/_lib/pylibcudf/io/CMakeLists.txt b/python/cudf/cudf/_lib/pylibcudf/io/CMakeLists.txt index 2cfec101bab..0e85cfb0654 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/CMakeLists.txt +++ b/python/cudf/cudf/_lib/pylibcudf/io/CMakeLists.txt @@ -12,7 +12,7 @@ # the License. # ============================================================================= -set(cython_sources avro.pyx types.pyx) +set(cython_sources avro.pyx json.pyx types.pyx) set(linked_libraries cudf::cudf) rapids_cython_create_modules( @@ -21,5 +21,5 @@ rapids_cython_create_modules( LINKED_LIBRARIES "${linked_libraries}" MODULE_PREFIX pylibcudf_io_ ASSOCIATED_TARGETS cudf ) -set(targets_using_arrow_headers pylibcudf_io_avro pylibcudf_io_types) +set(targets_using_arrow_headers pylibcudf_io_avro pylibcudf_io_json pylibcudf_io_types) link_to_pyarrow_headers("${targets_using_arrow_headers}") diff --git a/python/cudf/cudf/_lib/pylibcudf/io/__init__.pxd b/python/cudf/cudf/_lib/pylibcudf/io/__init__.pxd index 250292746c1..1bf355a1461 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/__init__.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/io/__init__.pxd @@ -1,4 +1,4 @@ # Copyright (c) 2024, NVIDIA CORPORATION. -from . cimport avro, types -from .types cimport SourceInfo, TableWithMetadata +from . cimport avro, json, types +from .types cimport SinkInfo, SourceInfo, TableWithMetadata diff --git a/python/cudf/cudf/_lib/pylibcudf/io/__init__.py b/python/cudf/cudf/_lib/pylibcudf/io/__init__.py index 5242c741911..9fc05451b3b 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/__init__.py +++ b/python/cudf/cudf/_lib/pylibcudf/io/__init__.py @@ -1,4 +1,4 @@ # Copyright (c) 2024, NVIDIA CORPORATION. -from . import avro, types -from .types import SourceInfo, TableWithMetadata +from . import avro, json, types +from .types import SinkInfo, SourceInfo, TableWithMetadata diff --git a/python/cudf/cudf/_lib/pylibcudf/io/json.pxd b/python/cudf/cudf/_lib/pylibcudf/io/json.pxd new file mode 100644 index 00000000000..e4191694000 --- /dev/null +++ b/python/cudf/cudf/_lib/pylibcudf/io/json.pxd @@ -0,0 +1,19 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from libcpp cimport bool +from libcpp.string cimport string + +from cudf._lib.pylibcudf.io.types cimport SinkInfo, TableWithMetadata +from cudf._lib.pylibcudf.libcudf.io.types cimport compression_type + + +cpdef void write_json( + SinkInfo sink_info, + TableWithMetadata tbl, + str na_rep = *, + bool include_nulls = *, + bool lines = *, + int rows_per_chunk = *, + str true_value = *, + str false_value = * +) diff --git a/python/cudf/cudf/_lib/pylibcudf/io/json.pyx b/python/cudf/cudf/_lib/pylibcudf/io/json.pyx new file mode 100644 index 00000000000..78d4e16ff30 --- /dev/null +++ b/python/cudf/cudf/_lib/pylibcudf/io/json.pyx @@ -0,0 +1,47 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from libcpp cimport bool +from libcpp.limits cimport numeric_limits +from libcpp.string cimport string +from libcpp.utility cimport move + +from cudf._lib.pylibcudf.io.types cimport SinkInfo, TableWithMetadata +from cudf._lib.pylibcudf.libcudf.io.json cimport ( + json_writer_options, + write_json as cpp_write_json, +) +from cudf._lib.pylibcudf.libcudf.io.types cimport table_metadata +from cudf._lib.pylibcudf.types cimport size_type + + +cpdef void write_json( + SinkInfo sink_info, + TableWithMetadata table_w_meta, + str na_rep = "", + bool include_nulls = False, + bool lines = False, + int rows_per_chunk = numeric_limits[size_type].max(), + str true_value = "true", + str false_value = "false" +): + """ + """ + cdef table_metadata tbl_meta = table_w_meta.metadata + cdef string na_rep_c = na_rep.encode() + cdef string true_value_c = true_value.encode() + cdef string false_value_c = false_value.encode() + + cdef json_writer_options options = move( + json_writer_options.builder(sink_info.c_obj, table_w_meta.tbl.view()) + .metadata(tbl_meta) + .na_rep(na_rep_c) + .include_nulls(include_nulls) + .lines(lines) + .rows_per_chunk(rows_per_chunk) + .true_value(true_value_c) + .false_value(false_value_c) + .build() + ) + + with nogil: + cpp_write_json(options) diff --git a/python/cudf/cudf/_lib/pylibcudf/io/types.pxd b/python/cudf/cudf/_lib/pylibcudf/io/types.pxd index aa846a47343..88daf54f33b 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/types.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/io/types.pxd @@ -1,4 +1,8 @@ # Copyright (c) 2024, NVIDIA CORPORATION. +from libcpp.memory cimport unique_ptr +from libcpp.vector cimport vector + +from cudf._lib.pylibcudf.libcudf.io.data_sink cimport data_sink from cudf._lib.pylibcudf.libcudf.io.types cimport ( column_encoding, column_in_metadata, @@ -22,8 +26,15 @@ cdef class TableWithMetadata: cdef public Table tbl cdef table_metadata metadata + cdef vector[column_name_info] _make_column_info(self, list column_names) + @staticmethod cdef TableWithMetadata from_libcudf(table_with_metadata& tbl) cdef class SourceInfo: cdef source_info c_obj + +cdef class SinkInfo: + # This vector just exists to keep the unique_ptrs to the sinks alive + cdef vector[unique_ptr[data_sink]] sink_storage + cdef sink_info c_obj diff --git a/python/cudf/cudf/_lib/pylibcudf/io/types.pyx b/python/cudf/cudf/_lib/pylibcudf/io/types.pyx index cd777232b33..4cacf355752 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/types.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/io/types.pyx @@ -1,15 +1,21 @@ # Copyright (c) 2024, NVIDIA CORPORATION. +from cpython.buffer cimport PyBUF_READ +from cpython.memoryview cimport PyMemoryView_FromMemory +from libcpp.memory cimport unique_ptr from libcpp.string cimport string from libcpp.utility cimport move from libcpp.vector cimport vector +from cudf._lib.pylibcudf.libcudf.io.data_sink cimport data_sink from cudf._lib.pylibcudf.libcudf.io.types cimport ( + column_name_info, host_buffer, source_info, table_with_metadata, ) +import codecs import errno import io import os @@ -20,7 +26,39 @@ cdef class TableWithMetadata: (e.g. column names) For details, see :cpp:class:`cudf::io::table_with_metadata`. + + Parameters + ---------- + tbl: Table + The input table. + column_names: list + A list of tuples each containing the name of each column + and the names of its child columns (in the same format). + e.g. + [("id", []), ("name", [("first", []), ("last", [])])] + """ + def __init__(self, Table tbl, list column_names): + self.tbl = tbl + + self.metadata.schema_info = move(self._make_column_info(column_names)) + + cdef vector[column_name_info] _make_column_info(self, list column_names): + cdef vector[column_name_info] col_name_infos + cdef column_name_info info + + col_name_infos.reserve(len(column_names)) + + for name, child_names in column_names: + if not isinstance(name, str): + raise ValueError("Column name must be a string!") + + info.name = move( name.encode()) + info.children = move(self._make_column_info(child_names)) + + col_name_infos.push_back(info) + + return col_name_infos @property def columns(self): @@ -49,6 +87,7 @@ cdef class TableWithMetadata: out.metadata = tbl_with_meta.metadata return out + cdef class SourceInfo: """A class containing details on a source to read from. @@ -108,3 +147,85 @@ cdef class SourceInfo: c_buffer.shape[0])) self.c_obj = source_info(c_host_buffers) + + +# Adapts a python io.IOBase object as a libcudf IO data_sink. This lets you +# write from cudf to any python file-like object (File/BytesIO/SocketIO etc) +cdef cppclass iobase_data_sink(data_sink): + object buf + + iobase_data_sink(object buf_): + this.buf = buf_ + + void host_write(const void * data, size_t size) with gil: + if isinstance(buf, io.StringIO): + buf.write(PyMemoryView_FromMemory(data, size, PyBUF_READ) + .tobytes().decode()) + else: + buf.write(PyMemoryView_FromMemory(data, size, PyBUF_READ)) + + void flush() with gil: + buf.flush() + + size_t bytes_written() with gil: + return buf.tell() + + +cdef class SinkInfo: + """A class containing details on a source to read from. + + For details, see :cpp:class:`cudf::io::sink_info`. + + Parameters + ---------- + sinks : List[Union[str, os.PathLike, + io.BytesIO, io.IOBase, + io.StringIO, io.TextIOBase]] + + A homogeneous list of sinks (this can be a string filename, + bytes, or one of the Python I/O classes) to read from. + + Mixing different types of sinks will raise a `ValueError`. + """ + + def __init__(self, list sinks): + cdef vector[data_sink *] data_sinks + + cdef vector[string] paths + if isinstance(sinks[0], io.StringIO): + data_sinks.reserve(len(sinks)) + for s in sinks: + self.sink_storage.push_back( + unique_ptr[data_sink](new iobase_data_sink(s)) + ) + data_sinks.push_back(self.sink_storage.back().get()) + self.c_obj = sink_info(data_sinks) + elif isinstance(sinks[0], io.TextIOBase): + data_sinks.reserve(len(sinks)) + for s in sinks: + # Files opened in text mode expect writes to be str rather than + # bytes, which requires conversion from utf-8. If the underlying + # buffer is utf-8, we can bypass this conversion by writing + # directly to it. + if codecs.lookup(s.encoding).name not in {"utf-8", "ascii"}: + raise NotImplementedError(f"Unsupported encoding {s.encoding}") + self.sink_storage.push_back( + unique_ptr[data_sink](new iobase_data_sink(s.buffer)) + ) + data_sinks.push_back(self.sink_storage.back().get()) + self.c_obj = sink_info(data_sinks) + elif isinstance(sinks[0], io.IOBase): + data_sinks.reserve(len(sinks)) + for s in sinks: + self.sink_storage.push_back( + unique_ptr[data_sink](new iobase_data_sink(s)) + ) + data_sinks.push_back(self.sink_storage.back().get()) + self.c_obj = sink_info(data_sinks) + elif isinstance(sinks[0], (basestring, os.PathLike)): + paths.reserve(len(sinks)) + for s in sinks: + paths.push_back( os.path.expanduser(s).encode()) + self.c_obj = sink_info(move(paths)) + else: + raise TypeError("Unrecognized input type: {}".format(type(sinks))) diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pxd index 8d87deb1472..8f9e8276fad 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pxd @@ -75,7 +75,6 @@ cdef extern from "cudf/io/types.hpp" \ vector[column_name_info] children cdef cppclass table_metadata: - table_metadata() except + vector[string] column_names map[string, string] user_data