Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support date/datetime for hive parts #17256

Merged
merged 12 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions crates/polars-io/src/csv/read/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ where
}
}

pub(crate) struct Utf8Field {
pub struct Utf8Field {
name: String,
mutable: MutableBinaryViewArray<str>,
scratch: Vec<u8>,
Expand Down Expand Up @@ -240,12 +240,12 @@ impl ParsedBuffer for Utf8Field {
}

#[cfg(not(feature = "dtype-categorical"))]
pub(crate) struct CategoricalField {
pub struct CategoricalField {
phantom: std::marker::PhantomData<u8>,
}

#[cfg(feature = "dtype-categorical")]
pub(crate) struct CategoricalField {
pub struct CategoricalField {
escape_scratch: Vec<u8>,
quote_char: u8,
builder: CategoricalChunkedBuilder,
Expand Down Expand Up @@ -351,7 +351,7 @@ impl ParsedBuffer for BooleanChunkedBuilder {
}

#[cfg(any(feature = "dtype-datetime", feature = "dtype-date"))]
pub(crate) struct DatetimeField<T: PolarsNumericType> {
pub struct DatetimeField<T: PolarsNumericType> {
compiled: Option<DatetimeInfer<T>>,
builder: PrimitiveChunkedBuilder<T>,
}
Expand Down Expand Up @@ -480,7 +480,7 @@ where
}
}

pub(crate) fn init_buffers(
pub fn init_buffers(
projection: &[usize],
capacity: usize,
schema: &Schema,
Expand Down Expand Up @@ -552,7 +552,7 @@ pub(crate) fn init_buffers(
}

#[allow(clippy::large_enum_variant)]
pub(crate) enum Buffer {
pub enum Buffer {
Boolean(BooleanChunkedBuilder),
#[cfg(feature = "dtype-i8")]
Int8(PrimitiveChunkedBuilder<Int8Type>),
Expand Down Expand Up @@ -585,7 +585,7 @@ pub(crate) enum Buffer {
}

impl Buffer {
pub(crate) fn into_series(self) -> PolarsResult<Series> {
pub fn into_series(self) -> PolarsResult<Series> {
let s = match self {
Buffer::Boolean(v) => v.finish().into_series(),
#[cfg(feature = "dtype-i8")]
Expand Down Expand Up @@ -642,7 +642,7 @@ impl Buffer {
Ok(s)
}

pub(crate) fn add_null(&mut self, valid: bool) {
pub fn add_null(&mut self, valid: bool) {
match self {
Buffer::Boolean(v) => v.append_null(),
#[cfg(feature = "dtype-i8")]
Expand Down Expand Up @@ -686,7 +686,7 @@ impl Buffer {
};
}

pub(crate) fn dtype(&self) -> DataType {
pub fn dtype(&self) -> DataType {
match self {
Buffer::Boolean(_) => DataType::Boolean,
#[cfg(feature = "dtype-i8")]
Expand Down Expand Up @@ -723,7 +723,7 @@ impl Buffer {
}

#[inline]
pub(crate) fn add(
pub fn add(
&mut self,
bytes: &[u8],
ignore_errors: bool,
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/csv/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
//! }
//! ```

mod buffer;
pub mod buffer;
mod options;
mod parser;
mod read_impl;
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-io/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub struct HiveOptions {
pub enabled: Option<bool>,
pub hive_start_idx: usize,
pub schema: Option<SchemaRef>,
pub try_parse_dates: bool,
}

impl Default for HiveOptions {
Expand All @@ -30,6 +31,7 @@ impl Default for HiveOptions {
enabled: Some(true),
hive_start_idx: 0,
schema: None,
try_parse_dates: true,
}
}
}
1 change: 1 addition & 0 deletions crates/polars-plan/src/plans/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ pub fn to_alp_impl(
},
Either::Right(v) => v.as_ref(),
},
file_options.hive_options.try_parse_dates,
)?
} else {
None
Expand Down
147 changes: 85 additions & 62 deletions crates/polars-plan/src/plans/hive.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::path::{Path, PathBuf};

use percent_encoding::percent_decode_str;
use percent_encoding::percent_decode;
use polars_core::error::to_compute_err;
use polars_core::prelude::*;
use polars_io::predicates::{BatchStats, ColumnStats};
use polars_io::prelude::schema_inference::{finish_infer_field_schema, infer_field_schema};
use polars_io::utils::{BOOLEAN_RE, FLOAT_RE, INTEGER_RE};
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -66,7 +66,21 @@ pub fn hive_partitions_from_paths(
hive_start_idx: usize,
schema: Option<SchemaRef>,
reader_schema: &Schema,
try_parse_dates: bool,
) -> PolarsResult<Option<Arc<[HivePartitions]>>> {
let paths = paths
.iter()
.map(|x| {
Ok(PathBuf::from(
percent_decode(x.to_str().unwrap().as_bytes())
.decode_utf8()
.map_err(to_compute_err)?
.as_ref(),
))
})
.collect::<PolarsResult<Vec<PathBuf>>>()?;
let paths = paths.as_slice();

let Some(path) = paths.first() else {
return Ok(None);
};
Expand All @@ -90,16 +104,23 @@ pub fn hive_partitions_from_paths(

let hive_schema = if let Some(ref schema) = schema {
Arc::new(get_hive_parts_iter!(path_string).map(|(name, _)| {
let Some(dtype) = schema.get(name) else {
polars_bail!(
SchemaFieldNotFound:
"path contains column not present in the given Hive schema: {:?}, path = {:?}",
name,
path
)
};
Ok(Field::new(name, dtype.clone()))
}).collect::<PolarsResult<Schema>>()?)
let Some(dtype) = schema.get(name) else {
polars_bail!(
SchemaFieldNotFound:
"path contains column not present in the given Hive schema: {:?}, path = {:?}",
name,
path
)
};

let dtype = if !try_parse_dates && dtype.is_temporal() {
DataType::String
} else {
dtype.clone()
};

Ok(Field::new(name, dtype))
}).collect::<PolarsResult<Schema>>()?)
} else {
let mut hive_schema = Schema::with_capacity(16);
let mut schema_inference_map: PlHashMap<&str, PlHashSet<DataType>> =
Expand All @@ -108,6 +129,12 @@ pub fn hive_partitions_from_paths(
for (name, _) in get_hive_parts_iter!(path_string) {
// If the column is also in the file we can use the dtype stored there.
if let Some(dtype) = reader_schema.get(name) {
let dtype = if !try_parse_dates && dtype.is_temporal() {
DataType::String
} else {
dtype.clone()
};

hive_schema.insert_at_index(hive_schema.len(), name.into(), dtype.clone())?;
continue;
}
Expand All @@ -127,7 +154,11 @@ pub fn hive_partitions_from_paths(
continue;
};

entry.insert(infer_field_schema(value, false, false));
if value.is_empty() || value == "__HIVE_DEFAULT_PARTITION__" {
continue;
}

entry.insert(infer_field_schema(value, try_parse_dates, false));
}
}

Expand All @@ -139,40 +170,62 @@ pub fn hive_partitions_from_paths(
Arc::new(hive_schema)
};

let mut hive_partitions = Vec::with_capacity(paths.len());
let mut buffers = polars_io::csv::read::buffer::init_buffers(
&(0..hive_schema.len()).collect::<Vec<_>>(),
paths.len(),
hive_schema.as_ref(),
None,
polars_io::prelude::CsvEncoding::Utf8,
false,
)?;

for path in paths {
let path = path.to_str().unwrap();

let column_stats = get_hive_parts_iter!(path)
.map(|(name, value)| {
let Some(dtype) = hive_schema.as_ref().get(name) else {
polars_bail!(
SchemaFieldNotFound:
"path contains column not present in the given Hive schema: {:?}, path = {:?}",
name,
path
)
};

Ok(ColumnStats::from_column_literal(value_to_series(
for (name, value) in get_hive_parts_iter!(path) {
let Some(index) = hive_schema.index_of(name) else {
polars_bail!(
SchemaFieldNotFound:
"path contains column not present in the given Hive schema: {:?}, path = {:?}",
name,
value,
Some(dtype),
)?))
path
)
};

let buf = buffers.get_mut(index).unwrap();

if !value.is_empty() && value != "__HIVE_DEFAULT_PARTITION__" {
buf.add(value.as_bytes(), false, false, false)?;
} else {
buf.add_null(false);
}
}
}

let mut hive_partitions = Vec::with_capacity(paths.len());
let buffers = buffers
.into_iter()
.map(|x| x.into_series())
.collect::<PolarsResult<Vec<_>>>()?;

#[allow(clippy::needless_range_loop)]
for i in 0..paths.len() {
let column_stats = buffers
.iter()
.map(|x| {
ColumnStats::from_column_literal(unsafe { x.take_slice_unchecked(&[i as IdxSize]) })
})
.collect::<PolarsResult<Vec<_>>>()?;
.collect::<Vec<_>>();

if column_stats.is_empty() {
polars_bail!(
ComputeError: "expected Hive partitioned path, got {}\n\n\
This error occurs if some paths are Hive partitioned and some paths are not.",
path
paths[i].to_str().unwrap(),
)
}

let stats = BatchStats::new(hive_schema.clone(), column_stats, None);

hive_partitions.push(HivePartitions { stats });
}

Expand Down Expand Up @@ -215,33 +268,3 @@ fn parse_hive_string(part: &'_ str) -> Option<(&'_ str, &'_ str)> {

Some((name, value))
}

/// Parse a string value into a single-value [`Series`].
fn value_to_series(name: &str, value: &str, dtype: Option<&DataType>) -> PolarsResult<Series> {
let fn_err = || polars_err!(ComputeError: "unable to parse Hive partition value: {:?}", value);

let mut s = if INTEGER_RE.is_match(value) {
let value = value.parse::<i64>().map_err(|_| fn_err())?;
Series::new(name, &[value])
} else if BOOLEAN_RE.is_match(value) {
let value = value.parse::<bool>().map_err(|_| fn_err())?;
Series::new(name, &[value])
} else if FLOAT_RE.is_match(value) {
let value = value.parse::<f64>().map_err(|_| fn_err())?;
Series::new(name, &[value])
} else if value == "__HIVE_DEFAULT_PARTITION__" {
Series::new_null(name, 1)
} else {
let value = percent_decode_str(value)
.decode_utf8()
.map_err(|_| fn_err())?;
Series::new(name, &[value])
};

// TODO: Avoid expensive logic above when dtype is known
if let Some(dt) = dtype {
s = s.strict_cast(dt)?;
}

Ok(s)
}
10 changes: 10 additions & 0 deletions py-polars/polars/io/parquet/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def read_parquet(
hive_partitioning: bool | None = None,
glob: bool = True,
hive_schema: SchemaDict | None = None,
try_parse_hive_dates: bool = True,
rechunk: bool = False,
low_memory: bool = False,
storage_options: dict[str, Any] | None = None,
Expand Down Expand Up @@ -94,6 +95,8 @@ def read_parquet(
.. warning::
This functionality is considered **unstable**. It may be changed
at any point without it being considered a breaking change.
try_parse_hive_dates
Whether to try parsing hive values as date/datetime types.
rechunk
Make sure that all columns are contiguous in memory by
aggregating the chunks into a single array.
Expand Down Expand Up @@ -182,6 +185,7 @@ def read_parquet(
use_statistics=use_statistics,
hive_partitioning=hive_partitioning,
hive_schema=hive_schema,
try_parse_hive_dates=try_parse_hive_dates,
rechunk=rechunk,
low_memory=low_memory,
cache=False,
Expand Down Expand Up @@ -294,6 +298,7 @@ def scan_parquet(
hive_partitioning: bool | None = None,
glob: bool = True,
hive_schema: SchemaDict | None = None,
try_parse_hive_dates: bool = True,
rechunk: bool = False,
low_memory: bool = False,
cache: bool = True,
Expand Down Expand Up @@ -336,6 +341,8 @@ def scan_parquet(
.. warning::
This functionality is considered **unstable**. It may be changed
at any point without it being considered a breaking change.
try_parse_hive_dates
Whether to try parsing hive values as date/datetime types.
rechunk
In case of reading multiple files via a glob pattern rechunk the final DataFrame
into contiguous memory chunks.
Expand Down Expand Up @@ -404,6 +411,7 @@ def scan_parquet(
use_statistics=use_statistics,
hive_partitioning=hive_partitioning,
hive_schema=hive_schema,
try_parse_hive_dates=try_parse_hive_dates,
retries=retries,
glob=glob,
)
Expand All @@ -424,6 +432,7 @@ def _scan_parquet_impl(
hive_partitioning: bool | None = None,
glob: bool = True,
hive_schema: SchemaDict | None = None,
try_parse_hive_dates: bool = True,
retries: int = 0,
) -> LazyFrame:
if isinstance(source, list):
Expand Down Expand Up @@ -451,6 +460,7 @@ def _scan_parquet_impl(
use_statistics=use_statistics,
hive_partitioning=hive_partitioning,
hive_schema=hive_schema,
try_parse_hive_dates=try_parse_hive_dates,
retries=retries,
glob=glob,
)
Expand Down
Loading