Skip to content

Commit

Permalink
refactor(python): Use polars parquet reader for delta scan (#19103)
Browse files Browse the repository at this point in the history
Co-authored-by: ritchie <ritchie46@gmail.com>
  • Loading branch information
ion-elgreco and ritchie46 authored Nov 16, 2024
1 parent b41bcbe commit 87b5bca
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 79 deletions.
133 changes: 96 additions & 37 deletions py-polars/polars/io/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,32 @@
import warnings
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any, cast
from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse

from polars import DataFrame
from polars.convert import from_arrow
from polars.datatypes import Null, Time
from polars.datatypes.convert import unpack_dtypes
from polars.dependencies import _DELTALAKE_AVAILABLE, deltalake
from polars.io.pyarrow_dataset import scan_pyarrow_dataset
from polars.io.parquet import scan_parquet
from polars.io.pyarrow_dataset.functions import scan_pyarrow_dataset
from polars.schema import Schema

if TYPE_CHECKING:
from deltalake import DeltaTable

from polars import DataType, LazyFrame
from polars import DataFrame, DataType, LazyFrame


def read_delta(
source: str | DeltaTable,
*,
version: int | str | datetime | None = None,
columns: list[str] | None = None,
rechunk: bool = False,
rechunk: bool | None = None,
storage_options: dict[str, Any] | None = None,
delta_table_options: dict[str, Any] | None = None,
use_pyarrow: bool = False,
pyarrow_options: dict[str, Any] | None = None,
) -> DataFrame:
"""
Expand Down Expand Up @@ -57,6 +59,8 @@ def read_delta(
<https://delta-io.github.io/delta-rs/usage/loading-table/>`__.
delta_table_options
Additional keyword arguments while reading a Delta lake Table.
use_pyarrow
Flag to enable pyarrow dataset reads.
pyarrow_options
Keyword arguments while converting a Delta lake Table to pyarrow table.
Expand All @@ -72,15 +76,6 @@ def read_delta(
>>> table_path = "/path/to/delta-table/"
>>> pl.read_delta(table_path) # doctest: +SKIP
Use the `pyarrow_options` parameter to read only certain partitions.
Note: This should be preferred over using an equivalent `.filter()` on the resulting
DataFrame, as this avoids reading the data at all.
>>> pl.read_delta( # doctest: +SKIP
... table_path,
... pyarrow_options={"partitions": [("year", "=", "2021")]},
... )
Reads a specific version of the Delta table from local filesystem.
Note: This will fail if the provided version of the delta table does not exist.
Expand Down Expand Up @@ -139,22 +134,19 @@ def read_delta(
... table_path, delta_table_options=delta_table_options
... ) # doctest: +SKIP
"""
if pyarrow_options is None:
pyarrow_options = {}

dl_tbl = _get_delta_lake_table(
table_path=source,
df = scan_delta(
source=source,
version=version,
storage_options=storage_options,
delta_table_options=delta_table_options,
use_pyarrow=use_pyarrow,
pyarrow_options=pyarrow_options,
rechunk=rechunk,
)

return cast(
DataFrame,
from_arrow(
dl_tbl.to_pyarrow_table(columns=columns, **pyarrow_options), rechunk=rechunk
),
)
if columns is not None:
df = df.select(columns)
return df.collect()


def scan_delta(
Expand All @@ -163,7 +155,9 @@ def scan_delta(
version: int | str | datetime | None = None,
storage_options: dict[str, Any] | None = None,
delta_table_options: dict[str, Any] | None = None,
use_pyarrow: bool = False,

This comment has been minimized.

Copy link
@ibabeo

ibabeo Dec 20, 2024

This is not backward compatible and will break the code. Would be nice to mention it in the release note :)

pyarrow_options: dict[str, Any] | None = None,
rechunk: bool | None = None,
) -> LazyFrame:
"""
Lazily read from a Delta lake table.
Expand All @@ -188,10 +182,15 @@ def scan_delta(
<https://delta-io.github.io/delta-rs/usage/loading-table/>`__.
delta_table_options
Additional keyword arguments while reading a Delta lake Table.
use_pyarrow
Flag to enable pyarrow dataset reads.
pyarrow_options
Keyword arguments while converting a Delta lake Table to pyarrow table.
Use this parameter when filtering on partitioned columns or to read
from a 'fsspec' supported filesystem.
rechunk
Make sure that all columns are contiguous in memory by
aggregating the chunks into a single array.
Returns
-------
Expand All @@ -205,13 +204,6 @@ def scan_delta(
>>> table_path = "/path/to/delta-table/"
>>> pl.scan_delta(table_path).collect() # doctest: +SKIP
Use the `pyarrow_options` parameter to read only certain partitions.
>>> pl.scan_delta( # doctest: +SKIP
... table_path,
... pyarrow_options={"partitions": [("year", "=", "2021")]},
... )
Creates a scan for a specific version of the Delta table from local filesystem.
Note: This will fail if the provided version of the delta table does not exist.
Expand Down Expand Up @@ -276,18 +268,85 @@ def scan_delta(
... table_path, delta_table_options=delta_table_options
... ).collect() # doctest: +SKIP
"""
if pyarrow_options is None:
pyarrow_options = {}

dl_tbl = _get_delta_lake_table(
table_path=source,
version=version,
storage_options=storage_options,
delta_table_options=delta_table_options,
)

pa_ds = dl_tbl.to_pyarrow_dataset(**pyarrow_options)
return scan_pyarrow_dataset(pa_ds)
if use_pyarrow:
pyarrow_options = pyarrow_options or {}
pa_ds = dl_tbl.to_pyarrow_dataset(**pyarrow_options)
return scan_pyarrow_dataset(pa_ds)

if pyarrow_options is not None:
msg = "To make use of pyarrow_options, set use_pyarrow to True"
raise ValueError(msg)

import pyarrow as pa
from deltalake.exceptions import DeltaProtocolError
from deltalake.table import (
MAX_SUPPORTED_READER_VERSION,
NOT_SUPPORTED_READER_VERSION,
SUPPORTED_READER_FEATURES,
)

table_protocol = dl_tbl.protocol()
if (
table_protocol.min_reader_version > MAX_SUPPORTED_READER_VERSION
or table_protocol.min_reader_version == NOT_SUPPORTED_READER_VERSION
):
msg = (
f"The table's minimum reader version is {table_protocol.min_reader_version} "
f"but polars delta scanner only supports version 1 or {MAX_SUPPORTED_READER_VERSION} with these reader features: {SUPPORTED_READER_FEATURES}"
)
raise DeltaProtocolError(msg)
if (
table_protocol.min_reader_version >= 3
and table_protocol.reader_features is not None
):
missing_features = {*table_protocol.reader_features}.difference(
SUPPORTED_READER_FEATURES
)
if len(missing_features) > 0:
msg = f"The table has set these reader features: {missing_features} but these are not yet supported by the polars delta scanner."
raise DeltaProtocolError(msg)

# Requires conversion through pyarrow table because there is no direct way yet to
# convert a delta schema into a polars schema
delta_schema = dl_tbl.schema().to_pyarrow(as_large_types=True)
polars_schema = from_arrow(pa.Table.from_pylist([], delta_schema)).schema # type: ignore[union-attr]
partition_columns = dl_tbl.metadata().partition_columns

def _split_schema(
schema: Schema, partition_columns: list[str]
) -> tuple[Schema, Schema]:
if len(partition_columns) == 0:
return schema, Schema([])
main_schema = []
hive_schema = []

for name, dtype in schema.items():
if name in partition_columns:
hive_schema.append((name, dtype))
else:
main_schema.append((name, dtype))

return Schema(main_schema), Schema(hive_schema)

# Required because main_schema cannot contain hive columns currently
main_schema, hive_schema = _split_schema(polars_schema, partition_columns)

return scan_parquet(
dl_tbl.file_uris(),
schema=main_schema,
hive_schema=hive_schema,
allow_missing_columns=True,
hive_partitioning=len(partition_columns) > 0,
storage_options=storage_options,
rechunk=rechunk or False,
)


def _resolve_delta_lake_uri(table_uri: str, *, strict: bool = True) -> str:
Expand Down
47 changes: 5 additions & 42 deletions py-polars/tests/unit/io/test_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,6 @@ def test_scan_delta_columns(delta_table_path: Path) -> None:
assert_frame_equal(expected, ldf.collect(), check_dtypes=False)


def test_scan_delta_filesystem(delta_table_path: Path) -> None:
raw_filesystem = pyarrow.fs.LocalFileSystem()
fs = pyarrow.fs.SubTreeFileSystem(str(delta_table_path), raw_filesystem)

ldf = pl.scan_delta(
str(delta_table_path), version=0, pyarrow_options={"filesystem": fs}
)

expected = pl.DataFrame({"name": ["Joey", "Ivan"], "age": [14, 32]})
assert_frame_equal(expected, ldf.collect(), check_dtypes=False)


def test_scan_delta_relative(delta_table_path: Path) -> None:
rel_delta_table_path = str(delta_table_path / ".." / "delta-table")

Expand Down Expand Up @@ -142,18 +130,6 @@ def test_read_delta_columns(delta_table_path: Path) -> None:
assert_frame_equal(expected, df, check_dtypes=False)


def test_read_delta_filesystem(delta_table_path: Path) -> None:
raw_filesystem = pyarrow.fs.LocalFileSystem()
fs = pyarrow.fs.SubTreeFileSystem(str(delta_table_path), raw_filesystem)

df = pl.read_delta(
str(delta_table_path), version=0, pyarrow_options={"filesystem": fs}
)

expected = pl.DataFrame({"name": ["Joey", "Ivan"], "age": [14, 32]})
assert_frame_equal(expected, df, check_dtypes=False)


def test_read_delta_relative(delta_table_path: Path) -> None:
rel_delta_table_path = str(delta_table_path / ".." / "delta-table")

Expand Down Expand Up @@ -208,7 +184,7 @@ def test_write_delta(df: pl.DataFrame, tmp_path: Path) -> None:
assert v1.columns == pl_df_1.columns

assert df_supported.shape == pl_df_partitioned.shape
assert df_supported.columns == pl_df_partitioned.columns
assert sorted(df_supported.columns) == sorted(pl_df_partitioned.columns)

assert tbl.version() == 1
assert partitioned_tbl.version() == 0
Expand Down Expand Up @@ -240,7 +216,7 @@ def test_write_delta(df: pl.DataFrame, tmp_path: Path) -> None:

assert partitioned_tbl.version() == 1
assert pl_df_partitioned.shape == (6, 14) # Rows are doubled
assert df_supported.columns == pl_df_partitioned.columns
assert sorted(df_supported.columns) == sorted(pl_df_partitioned.columns)

df_supported.write_delta(partitioned_tbl_uri, mode="overwrite")

Expand Down Expand Up @@ -494,6 +470,9 @@ def test_unsupported_dtypes(tmp_path: Path) -> None:
df.write_delta(tmp_path / "time")


@pytest.mark.skip(
reason="upstream bug in delta-rs causing categorical to be written as categorical in parquet"
)
@pytest.mark.write_disk
def test_categorical_becomes_string(tmp_path: Path) -> None:
df = pl.DataFrame({"a": ["A", "B", "A"]}, schema={"a": pl.Categorical})
Expand All @@ -502,22 +481,6 @@ def test_categorical_becomes_string(tmp_path: Path) -> None:
assert_frame_equal(df2, pl.DataFrame({"a": ["A", "B", "A"]}, schema={"a": pl.Utf8}))


@pytest.mark.write_disk
@pytest.mark.parametrize("rechunk_and_expected_chunks", [(True, 1), (False, 3)])
def test_read_parquet_respects_rechunk_16982(
rechunk_and_expected_chunks: tuple[bool, int], tmp_path: Path
) -> None:
# Create a delta lake table with 3 chunks:
df = pl.DataFrame({"a": [1]})
df.write_delta(str(tmp_path))
df.write_delta(str(tmp_path), mode="append")
df.write_delta(str(tmp_path), mode="append")

rechunk, expected_chunks = rechunk_and_expected_chunks
result = pl.read_delta(str(tmp_path), rechunk=rechunk)
assert result.n_chunks() == expected_chunks


def test_scan_delta_DT_input(delta_table_path: Path) -> None:
DT = DeltaTable(str(delta_table_path), version=0)
ldf = pl.scan_delta(DT)
Expand Down

0 comments on commit 87b5bca

Please sign in to comment.