From 5ec60a45ed9b23378b305b3364f3362141817f3a Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Wed, 19 Jun 2024 15:03:27 +1000 Subject: [PATCH 1/4] c --- .../polars-lazy/src/scan/file_list_reader.rs | 6 +- crates/polars-plan/src/plans/hive.rs | 19 +++--- py-polars/tests/unit/io/test_hive.py | 63 ++++++++++++++++++- 3 files changed, 76 insertions(+), 12 deletions(-) diff --git a/crates/polars-lazy/src/scan/file_list_reader.rs b/crates/polars-lazy/src/scan/file_list_reader.rs index e31771e78334..3be73c4006a2 100644 --- a/crates/polars-lazy/src/scan/file_list_reader.rs +++ b/crates/polars-lazy/src/scan/file_list_reader.rs @@ -54,6 +54,7 @@ fn expand_paths( } else if !path.ends_with("/") && is_file_cloud(path.to_str().unwrap(), cloud_options)? { + expand_start_idx = 0; out_paths.push(path.clone()); continue; } else if !glob { @@ -120,15 +121,12 @@ fn expand_paths( out_paths.push(path.map_err(to_compute_err)?); } } else { + expand_start_idx = 0; out_paths.push(path.clone()); } } } - // Todo: - // This maintains existing behavior - will remove very soon. - expand_start_idx = 0; - Ok(( out_paths.into_iter().collect::>(), expand_start_idx, diff --git a/crates/polars-plan/src/plans/hive.rs b/crates/polars-plan/src/plans/hive.rs index 9e2f896020d3..bdb9fe3deecf 100644 --- a/crates/polars-plan/src/plans/hive.rs +++ b/crates/polars-plan/src/plans/hive.rs @@ -53,13 +53,18 @@ impl HivePartitions { } let schema = match schema { - Some(s) => { - polars_ensure!( - s.len() == partitions.len(), - SchemaMismatch: "path does not match the provided Hive schema" - ); - s - }, + Some(schema) => Arc::new( + partitions + .iter() + .map(|s| { + let mut field = s.field().into_owned(); + if let Some(dtype) = schema.get(field.name()) { + field.dtype = dtype.clone(); + }; + field + }) + .collect::(), + ), None => Arc::new(partitions.as_slice().into()), }; diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index 7997dd024464..274bb2e49bb3 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -187,7 +187,7 @@ def test_hive_partitioned_err(io_files_path: Path, tmp_path: Path) -> None: df.write_parquet(root / "file.parquet") with pytest.raises(DuplicateError, match="invalid Hive partition schema"): - pl.scan_parquet(root / "**/*.parquet", hive_partitioning=True).collect() + pl.scan_parquet(tmp_path, hive_partitioning=True).collect() @pytest.mark.write_disk() @@ -273,3 +273,64 @@ def test_read_parquet_hive_schema_with_pyarrow() -> None: match="cannot use `hive_partitions` with `use_pyarrow=True`", ): pl.read_parquet("test.parquet", hive_schema={"c": pl.Int32}, use_pyarrow=True) + + +def test_hive_partition_directory_scan(tmp_path: Path) -> None: + tmp_path.mkdir(exist_ok=True) + + dfs = [ + pl.DataFrame({'x': 5 * [1], 'a': 1, 'b': 1}), + pl.DataFrame({'x': 5 * [2], 'a': 1, 'b': 2}), + pl.DataFrame({'x': 5 * [3], 'a': 2, 'b': 1}), + pl.DataFrame({'x': 5 * [4], 'a': 2, 'b': 2}), + ] # fmt: skip + + for df in dfs: + a = df.item(0, "a") + b = df.item(0, "b") + path = tmp_path / f"a={a}/b={b}/data.bin" + path.parent.mkdir(exist_ok=True, parents=True) + df.drop("a", "b").write_parquet(path) + + df = pl.concat(dfs) + hive_schema = df.lazy().select("a", "b").collect_schema() + + out = pl.scan_parquet( + tmp_path, hive_partitioning=True, hive_schema=hive_schema + ).collect() + assert_frame_equal(out, df) + + out = pl.scan_parquet( + tmp_path, hive_partitioning=False, hive_schema=hive_schema + ).collect() + assert_frame_equal(out, df.drop("a", "b")) + + out = pl.scan_parquet( + tmp_path / "a=1", + hive_partitioning=True, + hive_schema=hive_schema, + ).collect() + assert_frame_equal(out, df.filter(a=1).drop("a")) + + out = pl.scan_parquet( + tmp_path / "a=1", + hive_partitioning=False, + hive_schema=hive_schema, + ).collect() + assert_frame_equal(out, df.filter(a=1).drop("a", "b")) + + path = tmp_path / "a=1/b=1/data.bin" + + df = dfs[0] + out = pl.scan_parquet( + path, hive_partitioning=True, hive_schema=hive_schema + ).collect() + + assert_frame_equal(out, df) + + df = dfs[0].drop("a", "b") + out = pl.scan_parquet( + path, hive_partitioning=False, hive_schema=hive_schema + ).collect() + + assert_frame_equal(out, df) From c6cfca4000467c29b8ea1d511f9f0543fb5ad1b1 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Wed, 19 Jun 2024 15:20:12 +1000 Subject: [PATCH 2/4] c --- py-polars/tests/unit/io/test_hive.py | 56 +++++++++++++++++++--------- 1 file changed, 39 insertions(+), 17 deletions(-) diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index 274bb2e49bb3..002ee2ac9d51 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -1,7 +1,8 @@ import warnings from collections import OrderedDict +from functools import partial from pathlib import Path -from typing import Any +from typing import Any, Callable import pyarrow.parquet as pq import pytest @@ -275,7 +276,27 @@ def test_read_parquet_hive_schema_with_pyarrow() -> None: pl.read_parquet("test.parquet", hive_schema={"c": pl.Int32}, use_pyarrow=True) -def test_hive_partition_directory_scan(tmp_path: Path) -> None: +@pytest.mark.parametrize( + ("scan_func", "write_func"), + [ + (pl.scan_parquet, pl.DataFrame.write_parquet), + ], +) +@pytest.mark.parametrize( + ["append_glob", "glob"], + [ + ("**/*.bin", True), + ("", True), + ("", False), + ], +) +def test_hive_partition_directory_scan( + tmp_path: Path, + append_glob: str, + scan_func: Callable[[Any], pl.LazyFrame], + write_func: Callable[[pl.DataFrame, Path], None], + glob: bool, +) -> None: tmp_path.mkdir(exist_ok=True) dfs = [ @@ -290,30 +311,35 @@ def test_hive_partition_directory_scan(tmp_path: Path) -> None: b = df.item(0, "b") path = tmp_path / f"a={a}/b={b}/data.bin" path.parent.mkdir(exist_ok=True, parents=True) - df.drop("a", "b").write_parquet(path) + write_func(df.drop("a", "b"), path) df = pl.concat(dfs) hive_schema = df.lazy().select("a", "b").collect_schema() - out = pl.scan_parquet( - tmp_path, hive_partitioning=True, hive_schema=hive_schema + scan = scan_func + scan = partial(scan_func, hive_schema=hive_schema, glob=glob) + + out = scan( + tmp_path / append_glob, + hive_partitioning=True, + hive_schema=hive_schema, ).collect() assert_frame_equal(out, df) - out = pl.scan_parquet( - tmp_path, hive_partitioning=False, hive_schema=hive_schema + out = scan( + tmp_path / append_glob, hive_partitioning=False, hive_schema=hive_schema ).collect() assert_frame_equal(out, df.drop("a", "b")) - out = pl.scan_parquet( - tmp_path / "a=1", + out = scan( + tmp_path / "a=1" / append_glob, hive_partitioning=True, hive_schema=hive_schema, ).collect() assert_frame_equal(out, df.filter(a=1).drop("a")) - out = pl.scan_parquet( - tmp_path / "a=1", + out = scan( + tmp_path / "a=1" / append_glob, hive_partitioning=False, hive_schema=hive_schema, ).collect() @@ -322,15 +348,11 @@ def test_hive_partition_directory_scan(tmp_path: Path) -> None: path = tmp_path / "a=1/b=1/data.bin" df = dfs[0] - out = pl.scan_parquet( - path, hive_partitioning=True, hive_schema=hive_schema - ).collect() + out = scan(path, hive_partitioning=True, hive_schema=hive_schema).collect() assert_frame_equal(out, df) df = dfs[0].drop("a", "b") - out = pl.scan_parquet( - path, hive_partitioning=False, hive_schema=hive_schema - ).collect() + out = scan(path, hive_partitioning=False, hive_schema=hive_schema).collect() assert_frame_equal(out, df) From 206a4f3c0dadbb295379dfcce054cf6a1227b6d7 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Wed, 19 Jun 2024 15:21:21 +1000 Subject: [PATCH 3/4] c --- py-polars/tests/unit/io/test_hive.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index 002ee2ac9d51..54bd5fc0d942 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -326,33 +326,29 @@ def test_hive_partition_directory_scan( ).collect() assert_frame_equal(out, df) - out = scan( - tmp_path / append_glob, hive_partitioning=False, hive_schema=hive_schema - ).collect() + out = scan(tmp_path / append_glob, hive_partitioning=False).collect() assert_frame_equal(out, df.drop("a", "b")) out = scan( tmp_path / "a=1" / append_glob, hive_partitioning=True, - hive_schema=hive_schema, ).collect() assert_frame_equal(out, df.filter(a=1).drop("a")) out = scan( tmp_path / "a=1" / append_glob, hive_partitioning=False, - hive_schema=hive_schema, ).collect() assert_frame_equal(out, df.filter(a=1).drop("a", "b")) path = tmp_path / "a=1/b=1/data.bin" df = dfs[0] - out = scan(path, hive_partitioning=True, hive_schema=hive_schema).collect() + out = scan(path, hive_partitioning=True).collect() assert_frame_equal(out, df) df = dfs[0].drop("a", "b") - out = scan(path, hive_partitioning=False, hive_schema=hive_schema).collect() + out = scan(path, hive_partitioning=False).collect() assert_frame_equal(out, df) From 67fcf94f036c395039878037e3fefe1bbdf327cc Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Wed, 19 Jun 2024 15:21:49 +1000 Subject: [PATCH 4/4] c --- py-polars/tests/unit/io/test_hive.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index 54bd5fc0d942..23a7902c01ff 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -283,7 +283,7 @@ def test_read_parquet_hive_schema_with_pyarrow() -> None: ], ) @pytest.mark.parametrize( - ["append_glob", "glob"], + ("append_glob", "glob"), [ ("**/*.bin", True), ("", True),