From f11198993d553641996c1c20886689e335414067 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 28 Jun 2024 14:45:43 +1000 Subject: [PATCH 01/12] c --- crates/polars-io/src/csv/read/buffer.rs | 18 +-- crates/polars-io/src/csv/read/mod.rs | 2 +- crates/polars-io/src/options.rs | 2 + .../src/plans/conversion/dsl_to_ir.rs | 1 + crates/polars-plan/src/plans/hive.rs | 141 ++++++++++-------- py-polars/polars/io/parquet/functions.py | 10 ++ py-polars/src/lazyframe/mod.rs | 4 +- py-polars/tests/unit/io/test_hive.py | 68 ++++++++- 8 files changed, 169 insertions(+), 77 deletions(-) diff --git a/crates/polars-io/src/csv/read/buffer.rs b/crates/polars-io/src/csv/read/buffer.rs index c706a606d9c2..e16bf99f7f3e 100644 --- a/crates/polars-io/src/csv/read/buffer.rs +++ b/crates/polars-io/src/csv/read/buffer.rs @@ -146,7 +146,7 @@ where } } -pub(crate) struct Utf8Field { +pub struct Utf8Field { name: String, mutable: MutableBinaryViewArray, scratch: Vec, @@ -245,7 +245,7 @@ pub(crate) struct CategoricalField { } #[cfg(feature = "dtype-categorical")] -pub(crate) struct CategoricalField { +pub struct CategoricalField { escape_scratch: Vec, quote_char: u8, builder: CategoricalChunkedBuilder, @@ -351,7 +351,7 @@ impl ParsedBuffer for BooleanChunkedBuilder { } #[cfg(any(feature = "dtype-datetime", feature = "dtype-date"))] -pub(crate) struct DatetimeField { +pub struct DatetimeField { compiled: Option>, builder: PrimitiveChunkedBuilder, } @@ -480,7 +480,7 @@ where } } -pub(crate) fn init_buffers( +pub fn init_buffers( projection: &[usize], capacity: usize, schema: &Schema, @@ -552,7 +552,7 @@ pub(crate) fn init_buffers( } #[allow(clippy::large_enum_variant)] -pub(crate) enum Buffer { +pub enum Buffer { Boolean(BooleanChunkedBuilder), #[cfg(feature = "dtype-i8")] Int8(PrimitiveChunkedBuilder), @@ -585,7 +585,7 @@ pub(crate) enum Buffer { } impl Buffer { - pub(crate) fn into_series(self) -> PolarsResult { + pub fn into_series(self) -> PolarsResult { let s = match self { Buffer::Boolean(v) => v.finish().into_series(), #[cfg(feature = "dtype-i8")] @@ -642,7 +642,7 @@ impl Buffer { Ok(s) } - pub(crate) fn add_null(&mut self, valid: bool) { + pub fn add_null(&mut self, valid: bool) { match self { Buffer::Boolean(v) => v.append_null(), #[cfg(feature = "dtype-i8")] @@ -686,7 +686,7 @@ impl Buffer { }; } - pub(crate) fn dtype(&self) -> DataType { + pub fn dtype(&self) -> DataType { match self { Buffer::Boolean(_) => DataType::Boolean, #[cfg(feature = "dtype-i8")] @@ -723,7 +723,7 @@ impl Buffer { } #[inline] - pub(crate) fn add( + pub fn add( &mut self, bytes: &[u8], ignore_errors: bool, diff --git a/crates/polars-io/src/csv/read/mod.rs b/crates/polars-io/src/csv/read/mod.rs index e50cbc4a5bb3..aced5f377208 100644 --- a/crates/polars-io/src/csv/read/mod.rs +++ b/crates/polars-io/src/csv/read/mod.rs @@ -16,7 +16,7 @@ //! } //! ``` -mod buffer; +pub mod buffer; mod options; mod parser; mod read_impl; diff --git a/crates/polars-io/src/options.rs b/crates/polars-io/src/options.rs index 607e66052e29..338bb819a099 100644 --- a/crates/polars-io/src/options.rs +++ b/crates/polars-io/src/options.rs @@ -22,6 +22,7 @@ pub struct HiveOptions { pub enabled: Option, pub hive_start_idx: usize, pub schema: Option, + pub try_parse_dates: bool, } impl Default for HiveOptions { @@ -30,6 +31,7 @@ impl Default for HiveOptions { enabled: Some(true), hive_start_idx: 0, schema: None, + try_parse_dates: true, } } } diff --git a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs index 0836e0ca305f..5cefebad7aac 100644 --- a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs @@ -155,6 +155,7 @@ pub fn to_alp_impl( }, Either::Right(v) => v.as_ref(), }, + file_options.hive_options.try_parse_dates, )? } else { None diff --git a/crates/polars-plan/src/plans/hive.rs b/crates/polars-plan/src/plans/hive.rs index 44937dd3ecef..10659cfa8863 100644 --- a/crates/polars-plan/src/plans/hive.rs +++ b/crates/polars-plan/src/plans/hive.rs @@ -1,10 +1,10 @@ use std::path::{Path, PathBuf}; -use percent_encoding::percent_decode_str; +use percent_encoding::percent_decode; +use polars_core::error::to_compute_err; use polars_core::prelude::*; use polars_io::predicates::{BatchStats, ColumnStats}; use polars_io::prelude::schema_inference::{finish_infer_field_schema, infer_field_schema}; -use polars_io::utils::{BOOLEAN_RE, FLOAT_RE, INTEGER_RE}; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; @@ -66,7 +66,21 @@ pub fn hive_partitions_from_paths( hive_start_idx: usize, schema: Option, reader_schema: &Schema, + try_parse_dates: bool, ) -> PolarsResult>> { + let paths = paths + .iter() + .map(|x| { + Ok(PathBuf::from( + percent_decode(x.to_str().unwrap().as_bytes()) + .decode_utf8() + .map_err(to_compute_err)? + .as_ref(), + )) + }) + .collect::>>()?; + let paths = paths.as_slice(); + let Some(path) = paths.first() else { return Ok(None); }; @@ -90,16 +104,23 @@ pub fn hive_partitions_from_paths( let hive_schema = if let Some(ref schema) = schema { Arc::new(get_hive_parts_iter!(path_string).map(|(name, _)| { - let Some(dtype) = schema.get(name) else { - polars_bail!( - SchemaFieldNotFound: - "path contains column not present in the given Hive schema: {:?}, path = {:?}", - name, - path - ) - }; - Ok(Field::new(name, dtype.clone())) - }).collect::>()?) + let Some(dtype) = schema.get(name) else { + polars_bail!( + SchemaFieldNotFound: + "path contains column not present in the given Hive schema: {:?}, path = {:?}", + name, + path + ) + }; + + let dtype = if !try_parse_dates && dtype.is_temporal() { + DataType::String + } else { + dtype.clone() + }; + + Ok(Field::new(name, dtype)) + }).collect::>()?) } else { let mut hive_schema = Schema::with_capacity(16); let mut schema_inference_map: PlHashMap<&str, PlHashSet> = @@ -127,7 +148,11 @@ pub fn hive_partitions_from_paths( continue; }; - entry.insert(infer_field_schema(value, false, false)); + if value.is_empty() || value == "__HIVE_DEFAULT_PARTITION__" { + continue; + } + + entry.insert(infer_field_schema(value, try_parse_dates, false)); } } @@ -139,40 +164,60 @@ pub fn hive_partitions_from_paths( Arc::new(hive_schema) }; - let mut hive_partitions = Vec::with_capacity(paths.len()); + let mut buffers = polars_io::csv::read::buffer::init_buffers( + &(0..hive_schema.len()).collect::>(), + paths.len(), + hive_schema.as_ref(), + None, + polars_io::prelude::CsvEncoding::Utf8, + false, + )?; for path in paths { let path = path.to_str().unwrap(); - let column_stats = get_hive_parts_iter!(path) - .map(|(name, value)| { - let Some(dtype) = hive_schema.as_ref().get(name) else { - polars_bail!( - SchemaFieldNotFound: - "path contains column not present in the given Hive schema: {:?}, path = {:?}", - name, - path - ) - }; - - Ok(ColumnStats::from_column_literal(value_to_series( + for (name, value) in get_hive_parts_iter!(path) { + let Some(index) = hive_schema.index_of(name) else { + polars_bail!( + SchemaFieldNotFound: + "path contains column not present in the given Hive schema: {:?}, path = {:?}", name, - value, - Some(dtype), - )?)) - }) - .collect::>>()?; + path + ) + }; + + let buf = buffers.get_mut(index).unwrap(); + + if !value.is_empty() && value != "__HIVE_DEFAULT_PARTITION__" { + buf.add(value.as_bytes(), false, false, false)?; + } else { + buf.add_null(false); + } + } + } + + let mut hive_partitions = Vec::with_capacity(paths.len()); + let buffers = buffers + .into_iter() + .map(|x| x.into_series()) + .collect::>>()?; + + #[allow(clippy::needless_range_loop)] + for i in 0..paths.len() { + let column_stats = buffers + .iter() + .map(|x| ColumnStats::from_column_literal(x.slice(i as i64, 1))) + .collect::>(); if column_stats.is_empty() { polars_bail!( ComputeError: "expected Hive partitioned path, got {}\n\n\ This error occurs if some paths are Hive partitioned and some paths are not.", - path + paths[i].to_str().unwrap(), ) } let stats = BatchStats::new(hive_schema.clone(), column_stats, None); - hive_partitions.push(HivePartitions { stats }); } @@ -215,33 +260,3 @@ fn parse_hive_string(part: &'_ str) -> Option<(&'_ str, &'_ str)> { Some((name, value)) } - -/// Parse a string value into a single-value [`Series`]. -fn value_to_series(name: &str, value: &str, dtype: Option<&DataType>) -> PolarsResult { - let fn_err = || polars_err!(ComputeError: "unable to parse Hive partition value: {:?}", value); - - let mut s = if INTEGER_RE.is_match(value) { - let value = value.parse::().map_err(|_| fn_err())?; - Series::new(name, &[value]) - } else if BOOLEAN_RE.is_match(value) { - let value = value.parse::().map_err(|_| fn_err())?; - Series::new(name, &[value]) - } else if FLOAT_RE.is_match(value) { - let value = value.parse::().map_err(|_| fn_err())?; - Series::new(name, &[value]) - } else if value == "__HIVE_DEFAULT_PARTITION__" { - Series::new_null(name, 1) - } else { - let value = percent_decode_str(value) - .decode_utf8() - .map_err(|_| fn_err())?; - Series::new(name, &[value]) - }; - - // TODO: Avoid expensive logic above when dtype is known - if let Some(dt) = dtype { - s = s.strict_cast(dt)?; - } - - Ok(s) -} diff --git a/py-polars/polars/io/parquet/functions.py b/py-polars/polars/io/parquet/functions.py index 6e8c924db326..9e9e93f5875c 100644 --- a/py-polars/polars/io/parquet/functions.py +++ b/py-polars/polars/io/parquet/functions.py @@ -44,6 +44,7 @@ def read_parquet( hive_partitioning: bool | None = None, glob: bool = True, hive_schema: SchemaDict | None = None, + try_parse_hive_dates: bool = True, rechunk: bool = False, low_memory: bool = False, storage_options: dict[str, Any] | None = None, @@ -94,6 +95,8 @@ def read_parquet( .. warning:: This functionality is considered **unstable**. It may be changed at any point without it being considered a breaking change. + try_parse_hive_dates + Whether to try parsing hive values as date/datetime types. rechunk Make sure that all columns are contiguous in memory by aggregating the chunks into a single array. @@ -182,6 +185,7 @@ def read_parquet( use_statistics=use_statistics, hive_partitioning=hive_partitioning, hive_schema=hive_schema, + try_parse_hive_dates=try_parse_hive_dates, rechunk=rechunk, low_memory=low_memory, cache=False, @@ -294,6 +298,7 @@ def scan_parquet( hive_partitioning: bool | None = None, glob: bool = True, hive_schema: SchemaDict | None = None, + try_parse_hive_dates: bool = True, rechunk: bool = False, low_memory: bool = False, cache: bool = True, @@ -336,6 +341,8 @@ def scan_parquet( .. warning:: This functionality is considered **unstable**. It may be changed at any point without it being considered a breaking change. + try_parse_hive_dates + Whether to try parsing hive values as date/datetime types. rechunk In case of reading multiple files via a glob pattern rechunk the final DataFrame into contiguous memory chunks. @@ -404,6 +411,7 @@ def scan_parquet( use_statistics=use_statistics, hive_partitioning=hive_partitioning, hive_schema=hive_schema, + try_parse_hive_dates=try_parse_hive_dates, retries=retries, glob=glob, ) @@ -424,6 +432,7 @@ def _scan_parquet_impl( hive_partitioning: bool | None = None, glob: bool = True, hive_schema: SchemaDict | None = None, + try_parse_hive_dates: bool = True, retries: int = 0, ) -> LazyFrame: if isinstance(source, list): @@ -451,6 +460,7 @@ def _scan_parquet_impl( use_statistics=use_statistics, hive_partitioning=hive_partitioning, hive_schema=hive_schema, + try_parse_hive_dates=try_parse_hive_dates, retries=retries, glob=glob, ) diff --git a/py-polars/src/lazyframe/mod.rs b/py-polars/src/lazyframe/mod.rs index e4b9794868d0..478aec64d521 100644 --- a/py-polars/src/lazyframe/mod.rs +++ b/py-polars/src/lazyframe/mod.rs @@ -286,7 +286,7 @@ impl PyLazyFrame { #[cfg(feature = "parquet")] #[staticmethod] #[pyo3(signature = (path, paths, n_rows, cache, parallel, rechunk, row_index, - low_memory, cloud_options, use_statistics, hive_partitioning, hive_schema, retries, glob) + low_memory, cloud_options, use_statistics, hive_partitioning, hive_schema, try_parse_hive_dates, retries, glob) )] fn new_from_parquet( path: Option, @@ -301,6 +301,7 @@ impl PyLazyFrame { use_statistics: bool, hive_partitioning: Option, hive_schema: Option>, + try_parse_hive_dates: bool, retries: usize, glob: bool, ) -> PyResult { @@ -336,6 +337,7 @@ impl PyLazyFrame { enabled: hive_partitioning, hive_start_idx: 0, schema: hive_schema, + try_parse_dates: try_parse_hive_dates, }; let args = ScanArgsParquet { diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index 4563885c72ad..4d5064b2014d 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -1,6 +1,8 @@ import os +import sys import warnings from collections import OrderedDict +from datetime import datetime from functools import partial from multiprocessing import get_context from pathlib import Path @@ -25,13 +27,10 @@ def impl_test_hive_partitioned_predicate_pushdown( root = tmp_path / "partitioned_data" - # Ignore the pyarrow legacy warning until we can write properly with new settings. - warnings.filterwarnings("ignore") pq.write_to_dataset( df.to_arrow(), root_path=root, partition_cols=["category", "fats_g"], - use_legacy_dataset=True, ) q = pl.scan_parquet(root / "**/*.parquet", hive_partitioning=False) # checks schema @@ -577,3 +576,66 @@ def assert_with_projections(lf: pl.LazyFrame, df: pl.DataFrame) -> None: rhs, ) assert_with_projections(lf, rhs) + + +@pytest.mark.write_disk() +def test_hive_partition_dates(tmp_path: Path) -> None: + df = pl.DataFrame( + { + "date1": [ + datetime(2024, 1, 1), + datetime(2024, 2, 1), + datetime(2024, 3, 1), + None, + ], + "date2": [ + datetime(2023, 1, 1), + datetime(2023, 2, 1), + None, + datetime(2023, 3, 1), + ], + "x": [1, 2, 3, 4], + }, + schema={"date1": pl.Date, "date2": pl.Datetime, "x": pl.Int32}, + ) + + root = tmp_path / "pyarrow" + pq.write_to_dataset( + df.to_arrow(), + root_path=root, + partition_cols=["date1", "date2"], + ) + + lf = pl.scan_parquet( + root, hive_schema=df.clear().select("date1", "date2").collect_schema() + ) + assert_frame_equal(lf.collect(), df.select("x", "date1", "date2")) + + lf = pl.scan_parquet(root) + assert_frame_equal(lf.collect(), df.select("x", "date1", "date2")) + + lf = pl.scan_parquet(root, try_parse_hive_dates=False) + assert_frame_equal( + lf.collect(), + df.select("x", "date1", "date2").with_columns( + pl.col("date1", "date2").cast(pl.String) + ), + ) + + # Windows doesn't support colons in path names + if sys.platform == "win32": + return + + root = tmp_path / "includes_hive_cols_in_file" + + for (date1, date2), part_df in df.group_by( + pl.col("date1").cast(pl.String).fill_null("__HIVE_DEFAULT_PARTITION__"), + pl.col("date2").cast(pl.String).fill_null("__HIVE_DEFAULT_PARTITION__"), + ): + path = root / f"date1={date1}/date2={date2}/data.bin" + path.parent.mkdir(exist_ok=True, parents=True) + part_df.write_parquet(path) + + # The schema for the hive columns is included in the file, so it should just work + lf = pl.scan_parquet(root) + assert_frame_equal(lf.collect(), df) From 8583c0e7d9ab9653d6e999952830b3f899dae6fd Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 28 Jun 2024 16:33:42 +1000 Subject: [PATCH 02/12] c --- py-polars/tests/unit/io/test_hive.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index 4d5064b2014d..e3b7e83a8133 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -622,8 +622,8 @@ def test_hive_partition_dates(tmp_path: Path) -> None: ), ) - # Windows doesn't support colons in path names - if sys.platform == "win32": + # These don't support colons in filename + if sys.platform == "win32" or os.getenv("POLARS_FORCE_ASYNC", "0") == "1": return root = tmp_path / "includes_hive_cols_in_file" From de542bd887f138f55fd6df6c55ff73f9cbd34fef Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 28 Jun 2024 16:49:35 +1000 Subject: [PATCH 03/12] c --- py-polars/tests/unit/io/test_hive.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index e3b7e83a8133..2eb02231b249 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -1,5 +1,5 @@ import os -import sys +import urllib.parse import warnings from collections import OrderedDict from datetime import datetime @@ -622,16 +622,13 @@ def test_hive_partition_dates(tmp_path: Path) -> None: ), ) - # These don't support colons in filename - if sys.platform == "win32" or os.getenv("POLARS_FORCE_ASYNC", "0") == "1": - return - root = tmp_path / "includes_hive_cols_in_file" for (date1, date2), part_df in df.group_by( pl.col("date1").cast(pl.String).fill_null("__HIVE_DEFAULT_PARTITION__"), pl.col("date2").cast(pl.String).fill_null("__HIVE_DEFAULT_PARTITION__"), ): + date2 = urllib.parse.quote(date2) path = root / f"date1={date1}/date2={date2}/data.bin" path.parent.mkdir(exist_ok=True, parents=True) part_df.write_parquet(path) From 91d7f65f34c998852bb1aad2745e8a60c0657758 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 28 Jun 2024 16:54:58 +1000 Subject: [PATCH 04/12] c --- crates/polars-plan/src/plans/hive.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/polars-plan/src/plans/hive.rs b/crates/polars-plan/src/plans/hive.rs index 10659cfa8863..4e6694153d2e 100644 --- a/crates/polars-plan/src/plans/hive.rs +++ b/crates/polars-plan/src/plans/hive.rs @@ -206,7 +206,9 @@ pub fn hive_partitions_from_paths( for i in 0..paths.len() { let column_stats = buffers .iter() - .map(|x| ColumnStats::from_column_literal(x.slice(i as i64, 1))) + .map(|x| { + ColumnStats::from_column_literal(unsafe { x.take_slice_unchecked(&[i as u64]) }) + }) .collect::>(); if column_stats.is_empty() { From cf8afa75e93a02d154be57e32ddab03def9bf3b0 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 28 Jun 2024 16:59:58 +1000 Subject: [PATCH 05/12] c --- crates/polars-io/src/csv/read/buffer.rs | 2 +- py-polars/tests/unit/io/test_hive.py | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/crates/polars-io/src/csv/read/buffer.rs b/crates/polars-io/src/csv/read/buffer.rs index e16bf99f7f3e..26e9359a6000 100644 --- a/crates/polars-io/src/csv/read/buffer.rs +++ b/crates/polars-io/src/csv/read/buffer.rs @@ -240,7 +240,7 @@ impl ParsedBuffer for Utf8Field { } #[cfg(not(feature = "dtype-categorical"))] -pub(crate) struct CategoricalField { +pub struct CategoricalField { phantom: std::marker::PhantomData, } diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index 2eb02231b249..34129be9fec3 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -626,10 +626,13 @@ def test_hive_partition_dates(tmp_path: Path) -> None: for (date1, date2), part_df in df.group_by( pl.col("date1").cast(pl.String).fill_null("__HIVE_DEFAULT_PARTITION__"), - pl.col("date2").cast(pl.String).fill_null("__HIVE_DEFAULT_PARTITION__"), + pl.col("date2") + .cast(pl.String) + .map_elements(urllib.parse.quote, return_dtype=pl.String) + .fill_null("__HIVE_DEFAULT_PARTITION__"), ): - date2 = urllib.parse.quote(date2) path = root / f"date1={date1}/date2={date2}/data.bin" + print(date2, path) path.parent.mkdir(exist_ok=True, parents=True) part_df.write_parquet(path) From 1c138a71bd14c9c3965533b0bbb32c05343526b1 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 28 Jun 2024 17:00:47 +1000 Subject: [PATCH 06/12] c --- py-polars/tests/unit/io/test_hive.py | 1 - 1 file changed, 1 deletion(-) diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index 34129be9fec3..2d2529a57ffb 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -632,7 +632,6 @@ def test_hive_partition_dates(tmp_path: Path) -> None: .fill_null("__HIVE_DEFAULT_PARTITION__"), ): path = root / f"date1={date1}/date2={date2}/data.bin" - print(date2, path) path.parent.mkdir(exist_ok=True, parents=True) part_df.write_parquet(path) From 877a0c21b3f0c2e83c71e3a7e833a9b6972d37ba Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 28 Jun 2024 17:04:49 +1000 Subject: [PATCH 07/12] c --- crates/polars-plan/src/plans/hive.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/polars-plan/src/plans/hive.rs b/crates/polars-plan/src/plans/hive.rs index 4e6694153d2e..d70fed17e9d4 100644 --- a/crates/polars-plan/src/plans/hive.rs +++ b/crates/polars-plan/src/plans/hive.rs @@ -207,7 +207,7 @@ pub fn hive_partitions_from_paths( let column_stats = buffers .iter() .map(|x| { - ColumnStats::from_column_literal(unsafe { x.take_slice_unchecked(&[i as u64]) }) + ColumnStats::from_column_literal(unsafe { x.take_slice_unchecked(&[i as IdxSize]) }) }) .collect::>(); From 1458145854a9376e31f63d07a12e9620eeae6c18 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 28 Jun 2024 17:07:59 +1000 Subject: [PATCH 08/12] c --- crates/polars-plan/src/plans/hive.rs | 6 ++++++ py-polars/tests/unit/io/test_hive.py | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/crates/polars-plan/src/plans/hive.rs b/crates/polars-plan/src/plans/hive.rs index d70fed17e9d4..adc7b2256c7f 100644 --- a/crates/polars-plan/src/plans/hive.rs +++ b/crates/polars-plan/src/plans/hive.rs @@ -129,6 +129,12 @@ pub fn hive_partitions_from_paths( for (name, _) in get_hive_parts_iter!(path_string) { // If the column is also in the file we can use the dtype stored there. if let Some(dtype) = reader_schema.get(name) { + let dtype = if !try_parse_dates && dtype.is_temporal() { + DataType::String + } else { + dtype.clone() + }; + hive_schema.insert_at_index(hive_schema.len(), name.into(), dtype.clone())?; continue; } diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index 2d2529a57ffb..3ad4054128cd 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -638,3 +638,9 @@ def test_hive_partition_dates(tmp_path: Path) -> None: # The schema for the hive columns is included in the file, so it should just work lf = pl.scan_parquet(root) assert_frame_equal(lf.collect(), df) + + lf = pl.scan_parquet(root, try_parse_hive_dates=False) + assert_frame_equal( + lf.collect(), + df.with_columns(pl.col("date1", "date2").cast(pl.String)), + ) From 473054213034e5e5da8fb67baed5b7b6dbcebcba Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 28 Jun 2024 17:33:44 +1000 Subject: [PATCH 09/12] c --- py-polars/tests/unit/io/test_hive.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index 3ad4054128cd..8fa5214d521b 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -622,6 +622,10 @@ def test_hive_partition_dates(tmp_path: Path) -> None: ), ) + # FIXME: Path gets un-escaped incorrectly for async + if os.getenv("POLARS_FORCE_ASYNC", "0") == "1": + return + root = tmp_path / "includes_hive_cols_in_file" for (date1, date2), part_df in df.group_by( From adf7e4ee06321204dbb9bc8c6a22d20f5d05e640 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 28 Jun 2024 17:34:26 +1000 Subject: [PATCH 10/12] c --- py-polars/tests/unit/io/test_hive.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index 8fa5214d521b..15970f514fe2 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -579,7 +579,10 @@ def assert_with_projections(lf: pl.LazyFrame, df: pl.DataFrame) -> None: @pytest.mark.write_disk() -def test_hive_partition_dates(tmp_path: Path) -> None: +def test_hive_partition_dates(tmp_path: Path, monkeypatch: Any) -> None: + # FIXME: Path gets un-escaped incorrectly for async + monkeypatch.setenv("POLARS_FORCE_ASYNC", "0") + df = pl.DataFrame( { "date1": [ @@ -622,10 +625,6 @@ def test_hive_partition_dates(tmp_path: Path) -> None: ), ) - # FIXME: Path gets un-escaped incorrectly for async - if os.getenv("POLARS_FORCE_ASYNC", "0") == "1": - return - root = tmp_path / "includes_hive_cols_in_file" for (date1, date2), part_df in df.group_by( From 6b08b3db46d1e53904d4eb31b360d31bbb7aeba4 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 28 Jun 2024 17:34:51 +1000 Subject: [PATCH 11/12] c --- py-polars/tests/unit/io/test_hive.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index 15970f514fe2..ab77b53be50c 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -580,7 +580,7 @@ def assert_with_projections(lf: pl.LazyFrame, df: pl.DataFrame) -> None: @pytest.mark.write_disk() def test_hive_partition_dates(tmp_path: Path, monkeypatch: Any) -> None: - # FIXME: Path gets un-escaped incorrectly for async + # FIXME: Path gets incorrectly un-escaped for async monkeypatch.setenv("POLARS_FORCE_ASYNC", "0") df = pl.DataFrame( From 1e33bff1285410a91e4fb21d8d62c2e1905cabd7 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 28 Jun 2024 17:55:29 +1000 Subject: [PATCH 12/12] c --- py-polars/tests/unit/io/test_hive.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index ab77b53be50c..1801ca98456a 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -580,7 +580,7 @@ def assert_with_projections(lf: pl.LazyFrame, df: pl.DataFrame) -> None: @pytest.mark.write_disk() def test_hive_partition_dates(tmp_path: Path, monkeypatch: Any) -> None: - # FIXME: Path gets incorrectly un-escaped for async + # TODO: Path gets incorrectly un-escaped for async monkeypatch.setenv("POLARS_FORCE_ASYNC", "0") df = pl.DataFrame(