Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python): Add collect_schema method to LazyFrame and DataFrame #16929

Merged
merged 6 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions docs/src/python/user-guide/lazy/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,33 @@
# --8<-- [end:setup]

# --8<-- [start:schema]
q3 = pl.DataFrame({"foo": ["a", "b", "c"], "bar": [0, 1, 2]}).lazy()
lf = pl.LazyFrame({"foo": ["a", "b", "c"], "bar": [0, 1, 2]})

print(q3.schema)
print(lf.collect_schema())
# --8<-- [end:schema]

# --8<-- [start:lazyround]
q4 = (
pl.DataFrame({"foo": ["a", "b", "c"], "bar": [0, 1, 2]})
.lazy()
.with_columns(pl.col("bar").round(0))
lf = pl.LazyFrame({"foo": ["a", "b", "c"], "bar": [0, 1, 2]}).with_columns(
pl.col("bar").round(0)
)
# --8<-- [end:lazyround]

# --8<-- [start:typecheck]
try:
print(q4.collect())
print(lf.collect())
except Exception as e:
print(e)
# --8<-- [end:typecheck]

# --8<-- [start:lazyeager]
lazy_eager_query = (
pl.DataFrame(
pl.LazyFrame(
{
"id": ["a", "b", "c"],
"month": ["jan", "feb", "mar"],
"values": [0, 1, 2],
}
)
.lazy()
.with_columns((2 * pl.col("values")).alias("double_values"))
.collect()
.pivot(
Expand Down
8 changes: 4 additions & 4 deletions docs/user-guide/lazy/schemas.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Schema

The schema of a Polars `DataFrame` or `LazyFrame` sets out the names of the columns and their datatypes. You can see the schema with the `.schema` method on a `DataFrame` or `LazyFrame`
The schema of a Polars `DataFrame` or `LazyFrame` sets out the names of the columns and their datatypes. You can see the schema with the `.collect_schema` method on a `DataFrame` or `LazyFrame`

{{code_block('user-guide/lazy/schema','schema',['DataFrame','lazy'])}}
{{code_block('user-guide/lazy/schema','schema',['LazyFrame'])}}

```python exec="on" result="text" session="user-guide/lazy/schemas"
--8<-- "python/user-guide/lazy/schema.py:setup"
Expand All @@ -17,7 +17,7 @@ One advantage of the lazy API is that Polars will check the schema before any da

We see how this works in the following simple example where we call the `.round` expression on the integer `bar` column.

{{code_block('user-guide/lazy/schema','lazyround',['lazy','with_columns'])}}
{{code_block('user-guide/lazy/schema','lazyround',['with_columns'])}}

The `.round` expression is only valid for columns with a floating point dtype. Calling `.round` on an integer column means the operation will raise an `InvalidOperationError` when we evaluate the query with `collect`. This schema check happens before the data is processed when we call `collect`.

Expand Down Expand Up @@ -58,7 +58,7 @@ We show how to deal with a non-lazy operation in this example where we:
- do a `.filter`
- finish by executing the query with `.collect` to get a `DataFrame`

{{code_block('user-guide/lazy/schema','lazyeager',['collect','pivot','filter'])}}
{{code_block('user-guide/lazy/schema','lazyeager',['collect','lazy','pivot','filter'])}}

```python exec="on" result="text" session="user-guide/lazy/schemas"
--8<-- "python/user-guide/lazy/schema.py:lazyeager"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Miscellaneous
.. autosummary::
:toctree: api/

DataFrame.collect_schema
DataFrame.corr
DataFrame.equals
DataFrame.lazy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Miscellaneous
LazyFrame.cache
LazyFrame.collect
LazyFrame.collect_async
LazyFrame.collect_schema
LazyFrame.fetch
LazyFrame.lazy
LazyFrame.map_batches
Expand Down
30 changes: 14 additions & 16 deletions py-polars/polars/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,26 +240,25 @@ def register_lazyframe_namespace(name: str) -> Callable[[type[NS]], type[NS]]:
--------
>>> @pl.api.register_lazyframe_namespace("types")
... class DTypeOperations:
... def __init__(self, ldf: pl.LazyFrame):
... self._ldf = ldf
... def __init__(self, lf: pl.LazyFrame):
... self._lf = lf
...
... def split_by_column_dtypes(self) -> list[pl.LazyFrame]:
... return [
... self._ldf.select(pl.col(tp))
... for tp in dict.fromkeys(self._ldf.dtypes)
... self._lf.select(pl.col(tp))
... for tp in dict.fromkeys(self._lf.collect_schema().dtypes())
... ]
...
... def upcast_integer_types(self) -> pl.LazyFrame:
... return self._ldf.with_columns(
... return self._lf.with_columns(
... pl.col(tp).cast(pl.Int64) for tp in (pl.Int8, pl.Int16, pl.Int32)
... )
>>>
>>> ldf = pl.DataFrame(
>>> lf = pl.LazyFrame(
... data={"a": [1, 2], "b": [3, 4], "c": [5.6, 6.7]},
... schema=[("a", pl.Int16), ("b", pl.Int32), ("c", pl.Float32)],
... ).lazy()
>>>
>>> ldf.collect()
... )
>>> lf.collect()
shape: (2, 3)
┌─────┬─────┬─────┐
│ a ┆ b ┆ c │
Expand All @@ -269,7 +268,7 @@ def register_lazyframe_namespace(name: str) -> Callable[[type[NS]], type[NS]]:
│ 1 ┆ 3 ┆ 5.6 │
│ 2 ┆ 4 ┆ 6.7 │
└─────┴─────┴─────┘
>>> ldf.types.upcast_integer_types().collect()
>>> lf.types.upcast_integer_types().collect()
shape: (2, 3)
┌─────┬─────┬─────┐
│ a ┆ b ┆ c │
Expand All @@ -279,14 +278,13 @@ def register_lazyframe_namespace(name: str) -> Callable[[type[NS]], type[NS]]:
│ 1 ┆ 3 ┆ 5.6 │
│ 2 ┆ 4 ┆ 6.7 │
└─────┴─────┴─────┘
>>>
>>> ldf = pl.DataFrame(

>>> lf = pl.LazyFrame(
... data=[["xx", 2, 3, 4], ["xy", 4, 5, 6], ["yy", 5, 6, 7], ["yz", 6, 7, 8]],
... schema=["a1", "a2", "b1", "b2"],
... orient="row",
... ).lazy()
>>>
>>> ldf.collect()
... )
>>> lf.collect()
shape: (4, 4)
┌─────┬─────┬─────┬─────┐
│ a1 ┆ a2 ┆ b1 ┆ b2 │
Expand All @@ -298,7 +296,7 @@ def register_lazyframe_namespace(name: str) -> Callable[[type[NS]], type[NS]]:
│ yy ┆ 5 ┆ 6 ┆ 7 │
│ yz ┆ 6 ┆ 7 ┆ 8 │
└─────┴─────┴─────┴─────┘
>>> pl.collect_all(ldf.types.split_by_column_dtypes())
>>> pl.collect_all(lf.types.split_by_column_dtypes())
[shape: (4, 1)
┌─────┐
│ a1 │
Expand Down
43 changes: 43 additions & 0 deletions py-polars/polars/dataframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -1265,6 +1265,49 @@ def _repr_html_(self, *, _from_series: bool = False) -> str:
).render()
)

def collect_schema(self) -> Schema:
"""
Get an ordered mapping of column names to their data type.

This is an alias for the :attr:`schema` property.

See Also
--------
schema

Notes
-----
This method is included to facilitate writing code that is generic for both
DataFrame and LazyFrame.

Examples
--------
Determine the schema.

>>> df = pl.DataFrame(
... {
... "foo": [1, 2, 3],
... "bar": [6.0, 7.0, 8.0],
... "ham": ["a", "b", "c"],
... }
... )
>>> df.collect_schema()
Schema({'foo': Int64, 'bar': Float64, 'ham': String})

Access various properties of the schema using the :class:`Schema` object.

>>> schema = df.collect_schema()
>>> schema["bar"]
Float64
>>> schema.names()
['foo', 'bar', 'ham']
>>> schema.dtypes()
[Int64, Float64, String]
>>> schema.len()
3
"""
return self.schema

def item(self, row: int | None = None, column: int | str | None = None) -> Any:
"""
Return the DataFrame as a scalar, or return the element at the given row/column.
Expand Down
28 changes: 13 additions & 15 deletions py-polars/polars/functions/eager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@
import contextlib
from functools import reduce
from itertools import chain
from typing import TYPE_CHECKING, Iterable, List, Sequence, cast, get_args
from typing import TYPE_CHECKING, Iterable, Sequence, get_args

import polars._reexport as pl
from polars import functions as F
from polars._utils.various import ordered_unique
from polars._utils.wrap import wrap_df, wrap_expr, wrap_ldf, wrap_s
from polars.exceptions import InvalidOperationError
from polars.type_aliases import ConcatMethod, FrameType
from polars.type_aliases import ConcatMethod

with contextlib.suppress(ImportError): # Module not available when building docs
import polars.polars as plr

if TYPE_CHECKING:
from polars import DataFrame, Expr, LazyFrame, Series
from polars.type_aliases import JoinStrategy, PolarsType
from polars.type_aliases import FrameType, JoinStrategy, PolarsType


def concat(
Expand Down Expand Up @@ -128,7 +128,7 @@ def concat(
# unpack/standardise (handles generator input)
elems = list(items)

if not len(elems) > 0:
if not elems:
msg = "cannot concat empty list"
raise ValueError(msg)
elif len(elems) == 1 and isinstance(
Expand All @@ -142,12 +142,12 @@ def concat(
raise TypeError(msg)

# establish common columns, maintaining the order in which they appear
all_columns = list(chain.from_iterable(e.columns for e in elems))
all_columns = list(chain.from_iterable(e.collect_schema() for e in elems))
key = {v: k for k, v in enumerate(ordered_unique(all_columns))}
common_cols = sorted(
reduce(
lambda x, y: set(x) & set(y), # type: ignore[arg-type, return-value]
chain(e.columns for e in elems),
chain(e.collect_schema() for e in elems),
),
key=lambda k: key.get(k, 0),
)
Expand Down Expand Up @@ -423,7 +423,8 @@ def align_frames(
""" # noqa: W505
if not frames:
return []
elif len({type(f) for f in frames}) != 1:

if len({type(f) for f in frames}) != 1:
msg = (
"input frames must be of a consistent type (all LazyFrame or all DataFrame)"
)
Expand All @@ -435,26 +436,23 @@ def align_frames(

# create aligned master frame (this is the most expensive part; afterwards
# we just subselect out the columns representing the component frames)
idx_frames = tuple((idx, df.lazy()) for idx, df in enumerate(frames))
idx_frames = [(idx, frame.lazy()) for idx, frame in enumerate(frames)]
alignment_frame = _alignment_join(
*idx_frames, align_on=align_on, how=how, descending=descending
)

# select-out aligned components from the master frame
aligned_cols = set(alignment_frame.columns)
aligned_cols = set(alignment_frame.collect_schema())
aligned_frames = []
for idx, df in idx_frames:
for idx, lf in idx_frames:
sfx = f":{idx}"
df_cols = [
F.col(f"{c}{sfx}").alias(c) if f"{c}{sfx}" in aligned_cols else F.col(c)
for c in df.columns
for c in lf.collect_schema()
]
f = alignment_frame.select(*df_cols)
if select is not None:
f = f.select(select)
aligned_frames.append(f)

return cast(
List[FrameType],
F.collect_all(aligned_frames) if eager else aligned_frames,
)
return F.collect_all(aligned_frames) if eager else aligned_frames # type: ignore[return-value]
6 changes: 4 additions & 2 deletions py-polars/polars/io/parquet/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pathlib import Path
from typing import IO, TYPE_CHECKING, Any, Sequence

import polars.functions as F
from polars._utils.deprecation import deprecate_renamed_parameter
from polars._utils.unstable import issue_unstable_warning
from polars._utils.various import (
Expand Down Expand Up @@ -189,8 +190,9 @@ def read_parquet(

if columns is not None:
if is_int_sequence(columns):
columns = [lf.columns[i] for i in columns]
lf = lf.select(columns)
lf = lf.select(F.nth(columns))
else:
lf = lf.select(columns)

return lf.collect()

Expand Down
Loading