diff --git a/.cargo/config.toml b/.cargo/config.toml index 1f678a632560..4b6473049ab8 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,3 +1,11 @@ [env] # Tune jemalloc (https://github.com/pola-rs/polars/issues/18088). JEMALLOC_SYS_WITH_MALLOC_CONF = "dirty_decay_ms:500,muzzy_decay_ms:-1" + +[target.'cfg(all())'] +rustflags = [ + "-C", "link-arg=-Wl,-rpath,.../pyenv.git/versions/3.10.15/lib", + "-C", "link-arg=-L.../pyenv.git/versions/3.10.15/lib", + "-C", "link-arg=-lpython3.10", +] + diff --git a/Cargo.lock b/Cargo.lock index dbd70821fd27..98215d153419 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -167,6 +167,9 @@ name = "arrayvec" version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +dependencies = [ + "serde", +] [[package]] name = "arrow-array" @@ -3209,6 +3212,7 @@ name = "polars-plan" version = "0.43.1" dependencies = [ "ahash", + "arrayvec", "bitflags", "bytemuck", "bytes", diff --git a/crates/polars-plan/Cargo.toml b/crates/polars-plan/Cargo.toml index d41be032bcf0..2680d03fae4b 100644 --- a/crates/polars-plan/Cargo.toml +++ b/crates/polars-plan/Cargo.toml @@ -41,8 +41,9 @@ rayon = { workspace = true } recursive = { workspace = true } regex = { workspace = true, optional = true } serde = { workspace = true, features = ["rc"], optional = true } -serde_json = { workspace = true, optional = true } +serde_json = { workspace = true, optional = false } strum_macros = { workspace = true } +arrayvec = { version = "0.7.6" , features = ["serde"]} [build-dependencies] version_check = { workspace = true } diff --git a/crates/polars-plan/src/dsl/array.rs b/crates/polars-plan/src/dsl/array.rs index 23faf363421c..bc450aceed2c 100644 --- a/crates/polars-plan/src/dsl/array.rs +++ b/crates/polars-plan/src/dsl/array.rs @@ -1,10 +1,11 @@ +use arrayvec::ArrayString; use polars_core::prelude::*; #[cfg(feature = "array_to_struct")] use polars_ops::chunked_array::array::{ arr_default_struct_name_gen, ArrToStructNameGenerator, ToStruct, }; -use crate::dsl::function_expr::ArrayFunction; +use crate::dsl::function_expr::{ArrayFunction, ArrayKwargs}; use crate::prelude::*; /// Specialized expressions for [`Series`] of [`DataType::Array`]. @@ -194,3 +195,32 @@ impl ArrayNameSpace { ) } } + +pub fn array_from_expr, IE: Into + Clone>( + s: E, + dtype_str: &str, +) -> PolarsResult { + let s: Vec<_> = s.as_ref().iter().map(|e| e.clone().into()).collect(); + + polars_ensure!(!s.is_empty(), ComputeError: "`array` needs one or more expressions"); + + // let mut kwargs = ArrayKwargs::default(); + // const max_sz: usize = kwargs.dtype_expr.capacity(); + const MAX_SZ: usize = 256; // hardcode for now, plan to replace this anyway + let mut trunc_str = dtype_str.to_string(); + trunc_str.truncate(MAX_SZ); + let fixed_string = ArrayString::<{ MAX_SZ }>::from(&trunc_str).unwrap(); + let kwargs = ArrayKwargs { + dtype_expr: fixed_string, + }; + + Ok(Expr::Function { + input: s, + function: FunctionExpr::ArrayExpr(ArrayFunction::Array(kwargs)), + options: FunctionOptions { + collect_groups: ApplyOptions::ElementWise, + flags: FunctionFlags::default() | FunctionFlags::INPUT_WILDCARD_EXPANSION, + ..Default::default() + }, + }) +} diff --git a/crates/polars-plan/src/dsl/function_expr/array.rs b/crates/polars-plan/src/dsl/function_expr/array.rs index dce6d44bce94..88baa90cc80a 100644 --- a/crates/polars-plan/src/dsl/function_expr/array.rs +++ b/crates/polars-plan/src/dsl/function_expr/array.rs @@ -1,8 +1,21 @@ +use std::collections::HashMap; + +use arrayvec::ArrayString; +use arrow::array::{FixedSizeListArray, PrimitiveArray}; +use arrow::bitmap::MutableBitmap; +use polars_core::with_match_physical_numeric_polars_type; use polars_ops::chunked_array::array::*; use super::*; use crate::{map, map_as_slice}; +#[derive(Copy, Clone, Eq, PartialEq, Debug, Hash, Default, Serialize, Deserialize)] +pub struct ArrayKwargs { + // Not sure how to get a serializable DataType here + // For prototype, use fixed size string + pub dtype_expr: ArrayString<256>, +} + #[derive(Clone, Copy, Eq, PartialEq, Hash, Debug)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum ArrayFunction { @@ -19,6 +32,7 @@ pub enum ArrayFunction { Any, #[cfg(feature = "array_any_all")] All, + Array(ArrayKwargs), Sort(SortOptions), Reverse, ArgMin, @@ -46,6 +60,7 @@ impl ArrayFunction { Median => mapper.map_to_float_dtype(), #[cfg(feature = "array_any_all")] Any | All => mapper.with_dtype(DataType::Boolean), + Array(kwargs) => array_output_type(mapper.args(), kwargs), Sort(_) => mapper.with_same_dtype(), Reverse => mapper.with_same_dtype(), ArgMin | ArgMax => mapper.with_dtype(IDX_DTYPE), @@ -60,6 +75,49 @@ impl ArrayFunction { } } +fn deserialize_dtype(dtype_expr: &str) -> PolarsResult> { + match dtype_expr.len() { + 0 => Ok(None), + _ => match serde_json::from_str::(dtype_expr) { + Ok(Expr::DtypeColumn(dtypes)) if dtypes.len() == 1 => Ok(Some(dtypes[0].clone())), + Ok(_) => Err( + polars_err!(ComputeError: "Expected a DtypeColumn expression with a single dtype"), + ), + Err(_) => Err(polars_err!(ComputeError: "Could not deserialize dtype expression")), + }, + } +} + +fn get_expected_dtype(inputs: &[DataType], kwargs: &ArrayKwargs) -> PolarsResult { + // Decide what dtype to use for the constructed array + // For now, the logic is to use the dtype in kwargs, if specified + // Otherwise, use the type of the first column. + // + // An alternate idea could be to call try_get_supertype for the types. + // Or logic like DataFrame::get_supertype_all + // The problem is, I think this cast may be too general and we may only want to support primitive types + // Also, we don't support String yet. + let expected_dtype = deserialize_dtype(&kwargs.dtype_expr)?.unwrap_or(inputs[0].clone()); + Ok(expected_dtype) +} + +fn array_output_type(input_fields: &[Field], kwargs: &ArrayKwargs) -> PolarsResult { + // Expected target type is either the provided dtype or the type of the first column + let dtypes: Vec = input_fields.iter().map(|f| f.dtype().clone()).collect(); + let expected_dtype = get_expected_dtype(&dtypes, kwargs)?; + + for field in input_fields.iter() { + if !field.dtype().is_numeric() { + polars_bail!(ComputeError: "all input fields must be numeric") + } + } + + Ok(Field::new( + PlSmallStr::from_static("array"), + DataType::Array(Box::new(expected_dtype), input_fields.len()), + )) +} + fn map_array_dtype_to_list_dtype(datatype: &DataType) -> PolarsResult { if let DataType::Array(inner, _) = datatype { Ok(DataType::List(inner.clone())) @@ -85,6 +143,7 @@ impl Display for ArrayFunction { Any => "any", #[cfg(feature = "array_any_all")] All => "all", + Array(_) => "array", Sort(_) => "sort", Reverse => "reverse", ArgMin => "arg_min", @@ -118,6 +177,7 @@ impl From for SpecialEq> { Any => map!(any), #[cfg(feature = "array_any_all")] All => map!(all), + Array(kwargs) => map_as_slice!(array_new, kwargs), Sort(options) => map!(sort, options), Reverse => map!(reverse), ArgMin => map!(arg_min), @@ -133,6 +193,100 @@ impl From for SpecialEq> { } } +// Create a new array from a slice of series +fn array_new(inputs: &[Column], kwargs: ArrayKwargs) -> PolarsResult { + array_internal(inputs, kwargs) +} +fn array_internal(inputs: &[Column], kwargs: ArrayKwargs) -> PolarsResult { + let dtypes: Vec = inputs.iter().map(|f| f.dtype().clone()).collect(); + let expected_dtype = get_expected_dtype(&dtypes, &kwargs)?; + + // This conversion is yuck, there is probably a standard way to go from &[Column] to &[Series] + let series: Vec = inputs + .iter() + .map(|col| col.clone().take_materialized_series()) + .collect(); + + // Convert dtype to native numeric type and invoke array_numeric + let res_series = with_match_physical_numeric_polars_type!(expected_dtype, |$T| { + array_numeric::<$T>(&series[..], &expected_dtype) + })?; + + Ok(res_series.into_column()) +} + +// Combine numeric series into an array +fn array_numeric( + inputs: &[Series], + dtype: &DataType, +) -> PolarsResult { + let rows = inputs[0].len(); + let cols = inputs.len(); + let capacity = cols * rows; + + let mut values: Vec = vec![T::Native::default(); capacity]; + + // Support for casting + // Cast fields to the target dtype as needed + let mut casts = HashMap::new(); + for j in 0..cols { + if inputs[j].dtype() != dtype { + let cast_input = inputs[j].cast(dtype)?; + casts.insert(j, cast_input); + } + } + + let mut cols_ca = Vec::new(); + for j in 0..cols { + if inputs[j].dtype() != dtype { + cols_ca.push(casts.get(&j).expect("expect conversion").unpack::()?); + } else { + cols_ca.push(inputs[j].unpack::()?); + } + } + + for i in 0..rows { + for j in 0..cols { + values[i * cols + j] = unsafe { cols_ca[j].value_unchecked(i) }; + } + } + + let validity = if cols_ca.iter().any(|col| col.has_nulls()) { + let mut validity = MutableBitmap::from_len_zeroed(capacity); + for (j, col) in cols_ca.iter().enumerate() { + let mut row_offset = 0; + for chunk in col.chunks() { + if let Some(chunk_validity) = chunk.validity() { + for set_bit in chunk_validity.true_idx_iter() { + validity.set(cols * (row_offset + set_bit) + j, true); + } + } else { + for chunk_row in 0..chunk.len() { + validity.set(cols * (row_offset + chunk_row) + j, true); + } + } + row_offset += chunk.len(); + } + } + Some(validity.into()) + } else { + None + }; + + let values_array = PrimitiveArray::from_vec(values).with_validity(validity); + let dtype = DataType::Array(Box::new(dtype.clone()), cols); + let arrow_dtype = dtype.to_arrow(CompatLevel::newest()); + let array = FixedSizeListArray::try_new( + arrow_dtype.clone(), + values_array.len(), + Box::new(values_array), + None, + )?; + Ok(unsafe { + Series::_try_from_arrow_unchecked("Array".into(), vec![Box::new(array)], &arrow_dtype)? + }) +} + pub(super) fn max(s: &Column) -> PolarsResult { Ok(s.array()?.array_max().into()) } @@ -249,3 +403,102 @@ pub(super) fn shift(s: &[Column]) -> PolarsResult { ca.array_shift(n.as_materialized_series()).map(Column::from) } + +#[cfg(test)] +mod test { + use polars_core::datatypes::Field; + use polars_core::frame::DataFrame; + use polars_core::prelude::{Column, Series}; + + use super::*; + + #[test] + fn test_array_f64() { + println!("\ntest_array_f64"); + let f1 = Series::new("f1".into(), &[1.0, 2.0]); + let f2 = Series::new("f2".into(), &[3.0, 4.0]); + + let mut cols: Vec = Vec::new(); + cols.push(Column::Series(f1)); + cols.push(Column::Series(f2)); + + let array_df = DataFrame::new(cols.clone()).unwrap(); + println!("input df\n{}\n", &array_df); + + let mut fields: Vec = Vec::new(); + for col in &cols { + let f: Field = (col.field().to_mut()).clone(); + fields.push(f); + } + let kwargs = crate::dsl::function_expr::array::ArrayKwargs { + dtype_expr: "{\"DtypeColumn\":[\"Float64\"]}".to_string(), + }; + let expected_result = + crate::dsl::function_expr::array::array_output_type(&fields, kwargs.clone()).unwrap(); + println!("expected result\n{:?}\n", &expected_result); + + let new_arr = + crate::dsl::function_expr::array::array_internal(array_df.get_columns(), kwargs); + println!("actual result\n{:?}", &new_arr); + + assert!(new_arr.is_ok()); + assert_eq!(new_arr.unwrap().dtype(), expected_result.dtype()); + } + + fn i32_series() -> (Vec, Vec, DataFrame) { + let f1 = Series::new("f1".into(), &[1, 2]); + let f2 = Series::new("f2".into(), &[3, 4]); + + let mut cols: Vec = Vec::new(); + cols.push(Column::Series(f1)); + cols.push(Column::Series(f2)); + + let array_df = DataFrame::new(cols.clone()).unwrap(); + println!("input df\n{}\n", &array_df); + + let mut fields: Vec = Vec::new(); + for col in &cols { + let f: Field = (col.field().to_mut()).clone(); + fields.push(f); + } + (cols, fields, array_df) + } + + #[test] + fn test_array_i32() { + println!("\ntest_array_i32"); + let (_cols, fields, array_df) = i32_series(); + let kwargs = crate::dsl::function_expr::array::ArrayKwargs { + dtype_expr: "{\"DtypeColumn\":[\"Int32\"]}".to_string(), + }; + let expected_result = + crate::dsl::function_expr::array::array_output_type(&fields, kwargs.clone()).unwrap(); + println!("expected result\n{:?}\n", &expected_result); + + let new_arr = + crate::dsl::function_expr::array::array_internal(array_df.get_columns(), kwargs); + println!("actual result\n{:?}", &new_arr); + + assert!(new_arr.is_ok()); + assert_eq!(new_arr.unwrap().dtype(), expected_result.dtype()); + } + + #[test] + fn test_array_i32_converted() { + println!("\ntest_array_i32_converted"); + let (_cols, fields, array_df) = i32_series(); + let kwargs = crate::dsl::function_expr::array::ArrayKwargs { + dtype_expr: "{\"DtypeColumn\":[\"Float64\"]}".to_string(), + }; + let expected_result = + crate::dsl::function_expr::array::array_output_type(&fields, kwargs.clone()).unwrap(); + println!("expected result\n{:?}\n", &expected_result); + + let new_arr = + crate::dsl::function_expr::array::array_internal(array_df.get_columns(), kwargs); + println!("actual result\n{:?}", &new_arr); + + assert!(new_arr.is_ok()); + assert_eq!(new_arr.unwrap().dtype(), expected_result.dtype()); + } +} diff --git a/crates/polars-plan/src/dsl/function_expr/mod.rs b/crates/polars-plan/src/dsl/function_expr/mod.rs index 0458b2b4a1d0..ad641c3a22c9 100644 --- a/crates/polars-plan/src/dsl/function_expr/mod.rs +++ b/crates/polars-plan/src/dsl/function_expr/mod.rs @@ -76,7 +76,7 @@ use std::fmt::{Display, Formatter}; use std::hash::{Hash, Hasher}; #[cfg(feature = "dtype-array")] -pub(crate) use array::ArrayFunction; +pub(crate) use array::{ArrayFunction, ArrayKwargs}; #[cfg(feature = "cov")] pub(crate) use correlation::CorrelationMethod; #[cfg(feature = "fused")] diff --git a/crates/polars-python/src/functions/lazy.rs b/crates/polars-python/src/functions/lazy.rs index cb826a699551..e7ab738eea13 100644 --- a/crates/polars-python/src/functions/lazy.rs +++ b/crates/polars-python/src/functions/lazy.rs @@ -199,6 +199,13 @@ pub fn concat_list(s: Vec) -> PyResult { Ok(expr.into()) } +#[pyfunction] +pub fn array(s: Vec, dtype_str: &str) -> PyResult { + let s = s.into_iter().map(|e| e.inner).collect::>(); + let expr = dsl::array_from_expr(s, dtype_str).map_err(PyPolarsErr::from)?; + Ok(expr.into()) +} + #[pyfunction] pub fn concat_str(s: Vec, separator: &str, ignore_nulls: bool) -> PyExpr { let s = s.into_iter().map(|e| e.inner).collect::>(); diff --git a/py-polars/polars/__init__.py b/py-polars/polars/__init__.py index 10f0ee54228b..db2710b15663 100644 --- a/py-polars/polars/__init__.py +++ b/py-polars/polars/__init__.py @@ -80,6 +80,7 @@ arctan2d, arg_sort_by, arg_where, + array, business_day_count, coalesce, col, @@ -315,6 +316,7 @@ "collect_all", "collect_all_async", "concat_list", + "array", "concat_str", "corr", "count", diff --git a/py-polars/polars/functions/__init__.py b/py-polars/polars/functions/__init__.py index fedd0ac2bff0..24c9fcee2fe9 100644 --- a/py-polars/polars/functions/__init__.py +++ b/py-polars/polars/functions/__init__.py @@ -14,6 +14,7 @@ sum_horizontal, ) from polars.functions.as_datatype import ( + array, concat_list, concat_str, duration, @@ -124,6 +125,7 @@ "collect_all", "collect_all_async", "concat_list", + "array", "concat_str", "corr", "count", diff --git a/py-polars/polars/functions/as_datatype.py b/py-polars/polars/functions/as_datatype.py index 30398daa01d9..ffda41f959b4 100644 --- a/py-polars/polars/functions/as_datatype.py +++ b/py-polars/polars/functions/as_datatype.py @@ -502,6 +502,30 @@ def concat_list(exprs: IntoExpr | Iterable[IntoExpr], *more_exprs: IntoExpr) -> return wrap_expr(plr.concat_list(exprs)) +def array( + exprs: IntoExpr | Iterable[IntoExpr], *more_exprs: IntoExpr, dtype: str = "" +) -> Expr: + """ + Horizontally concatenate columns into a single array column. + + Operates in linear time. + + Parameters + ---------- + exprs + Columns to concatenate into a single array column. Accepts expression input. + Strings are parsed as column names, other non-expression inputs are parsed as + literals. + *more_exprs + Additional columns to concatenate into a single array column, specified as + positional arguments. + dtype + A DataType expressed as a string + """ + exprs = parse_into_list_of_expressions(exprs, *more_exprs) + return wrap_expr(plr.array(exprs, dtype)) + + @overload def struct( *exprs: IntoExpr | Iterable[IntoExpr], diff --git a/py-polars/src/lib.rs b/py-polars/src/lib.rs index 1c645738102a..896105a3bc96 100644 --- a/py-polars/src/lib.rs +++ b/py-polars/src/lib.rs @@ -170,6 +170,7 @@ fn polars(py: Python, m: &Bound) -> PyResult<()> { .unwrap(); m.add_wrapped(wrap_pyfunction!(functions::concat_list)) .unwrap(); + m.add_wrapped(wrap_pyfunction!(functions::array)).unwrap(); m.add_wrapped(wrap_pyfunction!(functions::concat_str)) .unwrap(); m.add_wrapped(wrap_pyfunction!(functions::len)).unwrap(); diff --git a/py-polars/tests/unit/functions/as_datatype/test_array.py b/py-polars/tests/unit/functions/as_datatype/test_array.py new file mode 100644 index 000000000000..45e96bf4e6c1 --- /dev/null +++ b/py-polars/tests/unit/functions/as_datatype/test_array.py @@ -0,0 +1,94 @@ +import polars as pl +from polars.testing import assert_series_equal + + +def test_array() -> None: + s0 = pl.Series("a", [1.0, 2.0], dtype=pl.Float64) + s1 = pl.Series("b", [3.0, 4.0], dtype=pl.Float64) + expected_f64 = pl.Series( + "z", [[1.0, 3.0], [2.0, 4.0]], dtype=pl.Array(pl.Float64, 2) + ) + expected_i32 = pl.Series( + "z", [[1.0, 3.0], [2.0, 4.0]], dtype=pl.Array(pl.Int32, 2), strict=False + ) + df = pl.DataFrame([s0, s1]) + print("\n") + + result = df.select(pl.array(["a", "b"]).alias("z"))["z"] + print("No Cast") + print(result) + print() + assert_series_equal(result, expected_f64) + + result = df.select( + pl.array(["a", "b"], dtype='{"DtypeColumn":["Float64"]}').alias("z") + )["z"] + print("Cast to Float64") + print(result) + print() + assert_series_equal(result, expected_f64) + + result = df.select( + pl.array(["a", "b"], dtype='{"DtypeColumn":["Int32"]}').alias("z") + )["z"] + print("Cast to Int32") + print(result) + print() + assert_series_equal(result, expected_i32) + + +def test_array_empty() -> None: + s0 = pl.Series("a", [1.0, 2.0], dtype=pl.Float64) + s1 = pl.Series("b", [3.0, 4.0], dtype=pl.Float64) + expected_f64 = pl.Series( + "z", [[1.0, 3.0], [2.0, 4.0]], dtype=pl.Array(pl.Float64, 2) + ) + df = pl.DataFrame([s0, s1]) + print("\n") + + result = df.select(pl.array([], dtype='{"DtypeColumn":["Float64"]}').alias("z"))["z"] + print("Empty") + print(result) + print() + # assert_series_equal(result, expected_f64) + + + +def test_array_nulls() -> None: + s0 = pl.Series("a", [1.0, None], dtype=pl.Float64) + s1 = pl.Series("b", [None, 4.0], dtype=pl.Float64) + expected_f64 = pl.Series( + "z", [[1.0, None], [None, 4.0]], dtype=pl.Array(pl.Float64, 2) + ) + df = pl.DataFrame([s0, s1]) + print("\n") + + result = df.select(pl.array(["a", "b"]).alias("z"))["z"] + print("No Cast") + print(result) + print() + assert_series_equal(result, expected_f64) + + +def test_array_string() -> None: + s0 = pl.Series("a", ["1", "2"]) + s1 = pl.Series("b", ["3", "4"]) + expected_f64 = pl.Series( + "z", [[1.0, 3.0], [2.0, 4.0]], dtype=pl.Array(pl.Float64, 2) + ) + df = pl.DataFrame([s0, s1]) + print("\n") + + result = df.select(pl.array(["a", "b"]).alias("z"))["z"] + print("No Cast") + print(result) + print() + assert_series_equal(result, expected_f64) + + result = df.select( + pl.array(["a", "b"], dtype='{"DtypeColumn":["Float64"]}').alias("z") + )["z"] + print("Cast to Float64") + print(result) + print() + assert_series_equal(result, expected_f64)