Skip to content

Commit

Permalink
Optimize struct and named_struct functions (#11688)
Browse files Browse the repository at this point in the history
* Remove unnecessary heap allocations in implementation of `named_struct_expr` caused by zipping then unzipping fields and values.

* Change implementation of `array_struct` to reduce number of allocations

* Remove tests already covered by `struct.slt`
  • Loading branch information
Rafferty97 authored Sep 3, 2024
1 parent 6bceeae commit bf6c82f
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 68 deletions.
21 changes: 9 additions & 12 deletions datafusion/functions/src/core/named_struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,17 @@ fn named_struct_expr(args: &[ColumnarValue]) -> Result<ColumnarValue> {
}
}

let arrays = ColumnarValue::values_to_arrays(&values)?;

let fields = names
let fields: Fields = names
.into_iter()
.zip(arrays)
.map(|(name, value)| {
(
Arc::new(Field::new(name, value.data_type().clone(), true)),
value,
)
})
.collect::<Vec<_>>();
.zip(&values)
.map(|(name, value)| Arc::new(Field::new(name, value.data_type().clone(), true)))
.collect::<Vec<_>>()
.into();

let arrays = ColumnarValue::values_to_arrays(&values)?;

Ok(ColumnarValue::Array(Arc::new(StructArray::from(fields))))
let struct_array = StructArray::new(fields, arrays, None);
Ok(ColumnarValue::Array(Arc::new(struct_array)))
}

#[derive(Debug)]
Expand Down
68 changes: 12 additions & 56 deletions datafusion/functions/src/core/struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,31 @@ fn array_struct(args: &[ArrayRef]) -> Result<ArrayRef> {
return exec_err!("struct requires at least one argument");
}

let vec: Vec<_> = args
let fields = args
.iter()
.enumerate()
.map(|(i, arg)| {
let field_name = format!("c{i}");
Ok((
Arc::new(Field::new(
field_name.as_str(),
arg.data_type().clone(),
true,
)),
Arc::clone(arg),
))
Ok(Arc::new(Field::new(
field_name.as_str(),
arg.data_type().clone(),
true,
)))
})
.collect::<Result<Vec<_>>>()?;
.collect::<Result<Vec<_>>>()?
.into();

Ok(Arc::new(StructArray::from(vec)))
let arrays = args.to_vec();

Ok(Arc::new(StructArray::new(fields, arrays, None)))
}

/// put values in a struct array.
fn struct_expr(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let arrays = ColumnarValue::values_to_arrays(args)?;
Ok(ColumnarValue::Array(array_struct(arrays.as_slice())?))
}

#[derive(Debug)]
pub struct StructFunc {
signature: Signature,
Expand Down Expand Up @@ -97,48 +98,3 @@ impl ScalarUDFImpl for StructFunc {
struct_expr(args)
}
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::array::Int64Array;
use datafusion_common::cast::as_struct_array;
use datafusion_common::ScalarValue;

#[test]
fn test_struct() {
// struct(1, 2, 3) = {"c0": 1, "c1": 2, "c2": 3}
let args = [
ColumnarValue::Scalar(ScalarValue::Int64(Some(1))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(2))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(3))),
];
let struc = struct_expr(&args)
.expect("failed to initialize function struct")
.into_array(1)
.expect("Failed to convert to array");
let result =
as_struct_array(&struc).expect("failed to initialize function struct");
assert_eq!(
&Int64Array::from(vec![1]),
Arc::clone(result.column_by_name("c0").unwrap())
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
);
assert_eq!(
&Int64Array::from(vec![2]),
Arc::clone(result.column_by_name("c1").unwrap())
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
);
assert_eq!(
&Int64Array::from(vec![3]),
Arc::clone(result.column_by_name("c2").unwrap())
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
);
}
}

0 comments on commit bf6c82f

Please sign in to comment.