From 87b5bca99c71091be99a698db6c3267e9aa14834 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Sat, 16 Nov 2024 18:43:42 +0100 Subject: [PATCH] refactor(python): Use polars parquet reader for delta scan (#19103) Co-authored-by: ritchie --- py-polars/polars/io/delta.py | 133 +++++++++++++++++++------- py-polars/tests/unit/io/test_delta.py | 47 +-------- 2 files changed, 101 insertions(+), 79 deletions(-) diff --git a/py-polars/polars/io/delta.py b/py-polars/polars/io/delta.py index b2b27c74a20f..6e9ebc7c7433 100644 --- a/py-polars/polars/io/delta.py +++ b/py-polars/polars/io/delta.py @@ -3,20 +3,21 @@ import warnings from datetime import datetime from pathlib import Path -from typing import TYPE_CHECKING, Any, cast +from typing import TYPE_CHECKING, Any from urllib.parse import urlparse -from polars import DataFrame from polars.convert import from_arrow from polars.datatypes import Null, Time from polars.datatypes.convert import unpack_dtypes from polars.dependencies import _DELTALAKE_AVAILABLE, deltalake -from polars.io.pyarrow_dataset import scan_pyarrow_dataset +from polars.io.parquet import scan_parquet +from polars.io.pyarrow_dataset.functions import scan_pyarrow_dataset +from polars.schema import Schema if TYPE_CHECKING: from deltalake import DeltaTable - from polars import DataType, LazyFrame + from polars import DataFrame, DataType, LazyFrame def read_delta( @@ -24,9 +25,10 @@ def read_delta( *, version: int | str | datetime | None = None, columns: list[str] | None = None, - rechunk: bool = False, + rechunk: bool | None = None, storage_options: dict[str, Any] | None = None, delta_table_options: dict[str, Any] | None = None, + use_pyarrow: bool = False, pyarrow_options: dict[str, Any] | None = None, ) -> DataFrame: """ @@ -57,6 +59,8 @@ def read_delta( `__. delta_table_options Additional keyword arguments while reading a Delta lake Table. + use_pyarrow + Flag to enable pyarrow dataset reads. pyarrow_options Keyword arguments while converting a Delta lake Table to pyarrow table. @@ -72,15 +76,6 @@ def read_delta( >>> table_path = "/path/to/delta-table/" >>> pl.read_delta(table_path) # doctest: +SKIP - Use the `pyarrow_options` parameter to read only certain partitions. - Note: This should be preferred over using an equivalent `.filter()` on the resulting - DataFrame, as this avoids reading the data at all. - - >>> pl.read_delta( # doctest: +SKIP - ... table_path, - ... pyarrow_options={"partitions": [("year", "=", "2021")]}, - ... ) - Reads a specific version of the Delta table from local filesystem. Note: This will fail if the provided version of the delta table does not exist. @@ -139,22 +134,19 @@ def read_delta( ... table_path, delta_table_options=delta_table_options ... ) # doctest: +SKIP """ - if pyarrow_options is None: - pyarrow_options = {} - - dl_tbl = _get_delta_lake_table( - table_path=source, + df = scan_delta( + source=source, version=version, storage_options=storage_options, delta_table_options=delta_table_options, + use_pyarrow=use_pyarrow, + pyarrow_options=pyarrow_options, + rechunk=rechunk, ) - return cast( - DataFrame, - from_arrow( - dl_tbl.to_pyarrow_table(columns=columns, **pyarrow_options), rechunk=rechunk - ), - ) + if columns is not None: + df = df.select(columns) + return df.collect() def scan_delta( @@ -163,7 +155,9 @@ def scan_delta( version: int | str | datetime | None = None, storage_options: dict[str, Any] | None = None, delta_table_options: dict[str, Any] | None = None, + use_pyarrow: bool = False, pyarrow_options: dict[str, Any] | None = None, + rechunk: bool | None = None, ) -> LazyFrame: """ Lazily read from a Delta lake table. @@ -188,10 +182,15 @@ def scan_delta( `__. delta_table_options Additional keyword arguments while reading a Delta lake Table. + use_pyarrow + Flag to enable pyarrow dataset reads. pyarrow_options Keyword arguments while converting a Delta lake Table to pyarrow table. Use this parameter when filtering on partitioned columns or to read from a 'fsspec' supported filesystem. + rechunk + Make sure that all columns are contiguous in memory by + aggregating the chunks into a single array. Returns ------- @@ -205,13 +204,6 @@ def scan_delta( >>> table_path = "/path/to/delta-table/" >>> pl.scan_delta(table_path).collect() # doctest: +SKIP - Use the `pyarrow_options` parameter to read only certain partitions. - - >>> pl.scan_delta( # doctest: +SKIP - ... table_path, - ... pyarrow_options={"partitions": [("year", "=", "2021")]}, - ... ) - Creates a scan for a specific version of the Delta table from local filesystem. Note: This will fail if the provided version of the delta table does not exist. @@ -276,9 +268,6 @@ def scan_delta( ... table_path, delta_table_options=delta_table_options ... ).collect() # doctest: +SKIP """ - if pyarrow_options is None: - pyarrow_options = {} - dl_tbl = _get_delta_lake_table( table_path=source, version=version, @@ -286,8 +275,78 @@ def scan_delta( delta_table_options=delta_table_options, ) - pa_ds = dl_tbl.to_pyarrow_dataset(**pyarrow_options) - return scan_pyarrow_dataset(pa_ds) + if use_pyarrow: + pyarrow_options = pyarrow_options or {} + pa_ds = dl_tbl.to_pyarrow_dataset(**pyarrow_options) + return scan_pyarrow_dataset(pa_ds) + + if pyarrow_options is not None: + msg = "To make use of pyarrow_options, set use_pyarrow to True" + raise ValueError(msg) + + import pyarrow as pa + from deltalake.exceptions import DeltaProtocolError + from deltalake.table import ( + MAX_SUPPORTED_READER_VERSION, + NOT_SUPPORTED_READER_VERSION, + SUPPORTED_READER_FEATURES, + ) + + table_protocol = dl_tbl.protocol() + if ( + table_protocol.min_reader_version > MAX_SUPPORTED_READER_VERSION + or table_protocol.min_reader_version == NOT_SUPPORTED_READER_VERSION + ): + msg = ( + f"The table's minimum reader version is {table_protocol.min_reader_version} " + f"but polars delta scanner only supports version 1 or {MAX_SUPPORTED_READER_VERSION} with these reader features: {SUPPORTED_READER_FEATURES}" + ) + raise DeltaProtocolError(msg) + if ( + table_protocol.min_reader_version >= 3 + and table_protocol.reader_features is not None + ): + missing_features = {*table_protocol.reader_features}.difference( + SUPPORTED_READER_FEATURES + ) + if len(missing_features) > 0: + msg = f"The table has set these reader features: {missing_features} but these are not yet supported by the polars delta scanner." + raise DeltaProtocolError(msg) + + # Requires conversion through pyarrow table because there is no direct way yet to + # convert a delta schema into a polars schema + delta_schema = dl_tbl.schema().to_pyarrow(as_large_types=True) + polars_schema = from_arrow(pa.Table.from_pylist([], delta_schema)).schema # type: ignore[union-attr] + partition_columns = dl_tbl.metadata().partition_columns + + def _split_schema( + schema: Schema, partition_columns: list[str] + ) -> tuple[Schema, Schema]: + if len(partition_columns) == 0: + return schema, Schema([]) + main_schema = [] + hive_schema = [] + + for name, dtype in schema.items(): + if name in partition_columns: + hive_schema.append((name, dtype)) + else: + main_schema.append((name, dtype)) + + return Schema(main_schema), Schema(hive_schema) + + # Required because main_schema cannot contain hive columns currently + main_schema, hive_schema = _split_schema(polars_schema, partition_columns) + + return scan_parquet( + dl_tbl.file_uris(), + schema=main_schema, + hive_schema=hive_schema, + allow_missing_columns=True, + hive_partitioning=len(partition_columns) > 0, + storage_options=storage_options, + rechunk=rechunk or False, + ) def _resolve_delta_lake_uri(table_uri: str, *, strict: bool = True) -> str: diff --git a/py-polars/tests/unit/io/test_delta.py b/py-polars/tests/unit/io/test_delta.py index 213988964de3..d142c316c7d1 100644 --- a/py-polars/tests/unit/io/test_delta.py +++ b/py-polars/tests/unit/io/test_delta.py @@ -69,18 +69,6 @@ def test_scan_delta_columns(delta_table_path: Path) -> None: assert_frame_equal(expected, ldf.collect(), check_dtypes=False) -def test_scan_delta_filesystem(delta_table_path: Path) -> None: - raw_filesystem = pyarrow.fs.LocalFileSystem() - fs = pyarrow.fs.SubTreeFileSystem(str(delta_table_path), raw_filesystem) - - ldf = pl.scan_delta( - str(delta_table_path), version=0, pyarrow_options={"filesystem": fs} - ) - - expected = pl.DataFrame({"name": ["Joey", "Ivan"], "age": [14, 32]}) - assert_frame_equal(expected, ldf.collect(), check_dtypes=False) - - def test_scan_delta_relative(delta_table_path: Path) -> None: rel_delta_table_path = str(delta_table_path / ".." / "delta-table") @@ -142,18 +130,6 @@ def test_read_delta_columns(delta_table_path: Path) -> None: assert_frame_equal(expected, df, check_dtypes=False) -def test_read_delta_filesystem(delta_table_path: Path) -> None: - raw_filesystem = pyarrow.fs.LocalFileSystem() - fs = pyarrow.fs.SubTreeFileSystem(str(delta_table_path), raw_filesystem) - - df = pl.read_delta( - str(delta_table_path), version=0, pyarrow_options={"filesystem": fs} - ) - - expected = pl.DataFrame({"name": ["Joey", "Ivan"], "age": [14, 32]}) - assert_frame_equal(expected, df, check_dtypes=False) - - def test_read_delta_relative(delta_table_path: Path) -> None: rel_delta_table_path = str(delta_table_path / ".." / "delta-table") @@ -208,7 +184,7 @@ def test_write_delta(df: pl.DataFrame, tmp_path: Path) -> None: assert v1.columns == pl_df_1.columns assert df_supported.shape == pl_df_partitioned.shape - assert df_supported.columns == pl_df_partitioned.columns + assert sorted(df_supported.columns) == sorted(pl_df_partitioned.columns) assert tbl.version() == 1 assert partitioned_tbl.version() == 0 @@ -240,7 +216,7 @@ def test_write_delta(df: pl.DataFrame, tmp_path: Path) -> None: assert partitioned_tbl.version() == 1 assert pl_df_partitioned.shape == (6, 14) # Rows are doubled - assert df_supported.columns == pl_df_partitioned.columns + assert sorted(df_supported.columns) == sorted(pl_df_partitioned.columns) df_supported.write_delta(partitioned_tbl_uri, mode="overwrite") @@ -494,6 +470,9 @@ def test_unsupported_dtypes(tmp_path: Path) -> None: df.write_delta(tmp_path / "time") +@pytest.mark.skip( + reason="upstream bug in delta-rs causing categorical to be written as categorical in parquet" +) @pytest.mark.write_disk def test_categorical_becomes_string(tmp_path: Path) -> None: df = pl.DataFrame({"a": ["A", "B", "A"]}, schema={"a": pl.Categorical}) @@ -502,22 +481,6 @@ def test_categorical_becomes_string(tmp_path: Path) -> None: assert_frame_equal(df2, pl.DataFrame({"a": ["A", "B", "A"]}, schema={"a": pl.Utf8})) -@pytest.mark.write_disk -@pytest.mark.parametrize("rechunk_and_expected_chunks", [(True, 1), (False, 3)]) -def test_read_parquet_respects_rechunk_16982( - rechunk_and_expected_chunks: tuple[bool, int], tmp_path: Path -) -> None: - # Create a delta lake table with 3 chunks: - df = pl.DataFrame({"a": [1]}) - df.write_delta(str(tmp_path)) - df.write_delta(str(tmp_path), mode="append") - df.write_delta(str(tmp_path), mode="append") - - rechunk, expected_chunks = rechunk_and_expected_chunks - result = pl.read_delta(str(tmp_path), rechunk=rechunk) - assert result.n_chunks() == expected_chunks - - def test_scan_delta_DT_input(delta_table_path: Path) -> None: DT = DeltaTable(str(delta_table_path), version=0) ldf = pl.scan_delta(DT)