Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make FirstValue an UDAF, Change AggregateUDFImpl::accumulator signature, support ORDER BY for UDAFs #9874

Merged
merged 49 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
b94f70f
first draft
jayzhan211 Feb 16, 2024
c743d13
clippy fix
jayzhan211 Feb 18, 2024
3a7e965
cleanup
jayzhan211 Feb 18, 2024
4917f56
use one vector for ordering req
jayzhan211 Feb 21, 2024
c9e8641
add sort exprs to accumulator
jayzhan211 Feb 21, 2024
3a5f0d1
clippy
jayzhan211 Feb 21, 2024
a3ea00a
cleanup
jayzhan211 Feb 21, 2024
f349f21
fix doc test
jayzhan211 Feb 21, 2024
6fcdaac
change to ref
jayzhan211 Feb 27, 2024
c3512a6
fix typo
jayzhan211 Feb 27, 2024
092d46e
fix doc
jayzhan211 Feb 27, 2024
8592e6b
fmt
jayzhan211 Mar 1, 2024
0f8fc24
move schema and logical ordering exprs
jayzhan211 Mar 1, 2024
3185f9f
remove redudant info
jayzhan211 Mar 1, 2024
3ecc772
rename
jayzhan211 Mar 1, 2024
faadc63
cleanup
jayzhan211 Mar 1, 2024
7e33910
add ignore nulls
jayzhan211 Mar 7, 2024
cfffcbf
Merge remote-tracking branch 'upstream/main' into udf-order-2
jayzhan211 Mar 25, 2024
6aaa15c
fix conflict
jayzhan211 Mar 25, 2024
b74b7d2
backup
jayzhan211 Mar 26, 2024
263e6cb
complete return_type
jayzhan211 Mar 26, 2024
0a77e4f
complete replace
jayzhan211 Mar 30, 2024
7b26377
split to first value udf
jayzhan211 Mar 30, 2024
4bfd91d
replace accumulator
jayzhan211 Mar 30, 2024
7f54141
fmt
jayzhan211 Mar 30, 2024
6339535
cleanup
jayzhan211 Mar 30, 2024
33ae6ee
small fix
jayzhan211 Mar 30, 2024
b4eb865
remove ordering types
jayzhan211 Mar 30, 2024
d8ab6c5
make state fields more flexible
jayzhan211 Mar 30, 2024
a3bff42
cleanup
jayzhan211 Mar 30, 2024
53465fd
replace done
jayzhan211 Mar 30, 2024
cc21496
cleanup
jayzhan211 Mar 30, 2024
b62544f
cleanup
jayzhan211 Mar 30, 2024
ddfabad
Merge remote-tracking branch 'upstream/main' into first-value-udf
jayzhan211 Mar 30, 2024
4b809b0
rm comments
jayzhan211 Mar 30, 2024
2534727
cleanup
jayzhan211 Mar 30, 2024
17378dd
rm test1
jayzhan211 Mar 30, 2024
dd1c4ba
fix state fields
jayzhan211 Mar 31, 2024
5d5d310
fmt
jayzhan211 Mar 31, 2024
23f20f9
args struct for accumulator
jayzhan211 Mar 31, 2024
b2ba8c3
simplify
jayzhan211 Mar 31, 2024
75aa2fe
add sig
jayzhan211 Mar 31, 2024
5b9625f
add comments
jayzhan211 Mar 31, 2024
d5c3f6f
fmt
jayzhan211 Mar 31, 2024
dc9549a
fix docs
jayzhan211 Apr 1, 2024
7ce3d41
Merge remote-tracking branch 'upstream/main' into first-value-udf
jayzhan211 Apr 1, 2024
49b4a76
use exprs utils
jayzhan211 Apr 1, 2024
d70cce5
rm state type
jayzhan211 Apr 2, 2024
29c4018
add comment
jayzhan211 Apr 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions datafusion-examples/examples/advanced_udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use arrow_schema::Schema;
use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility};
use datafusion_physical_expr::NullState;
use std::{any::Any, sync::Arc};
Expand All @@ -30,7 +31,8 @@ use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion_common::{cast::as_float64_array, ScalarValue};
use datafusion_expr::{
Accumulator, AggregateUDF, AggregateUDFImpl, GroupsAccumulator, Signature,
function::AccumulatorArgs, Accumulator, AggregateUDF, AggregateUDFImpl,
GroupsAccumulator, Signature,
};

/// This example shows how to use the full AggregateUDFImpl API to implement a user
Expand Down Expand Up @@ -85,7 +87,7 @@ impl AggregateUDFImpl for GeoMeanUdaf {
/// is supported, DataFusion will use this row oriented
/// accumulator when the aggregate function is used as a window function
/// or when there are only aggregates (no GROUP BY columns) in the plan.
fn accumulator(&self, _arg: &DataType) -> Result<Box<dyn Accumulator>> {
fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about the impact on this API for UDAF writers last night.

Specifically, about the many existing UDAFs that exist / will exist at the time this change gets released and on the first time people encounter / try to use this API. i think the args with datatypes is much easier to use (and has less mental gymnastics to use). Thus I am going to propose an easier / beginner API for this that will require fewer changes to existing UDAFs and will be easier to use for first timers

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what i came up with: #9920

Ok(Box::new(GeometricMean::new()))
}

Expand Down Expand Up @@ -191,7 +193,7 @@ impl Accumulator for GeometricMean {

// create local session context with an in-memory table
fn create_context() -> Result<SessionContext> {
use datafusion::arrow::datatypes::{Field, Schema};
use datafusion::arrow::datatypes::Field;
use datafusion::datasource::MemTable;
// define a schema.
let schema = Arc::new(Schema::new(vec![
Expand Down
20 changes: 20 additions & 0 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,14 @@ use datafusion_common::{
OwnedTableReference, SchemaReference,
};
use datafusion_execution::registry::SerializerRegistry;
use datafusion_expr::type_coercion::aggregates::NUMERICS;
use datafusion_expr::{create_first_value, Signature, Volatility};
use datafusion_expr::{
logical_plan::{DdlStatement, Statement},
var_provider::is_system_variables,
Expr, StringifiedPlan, UserDefinedLogicalNode, WindowUDF,
};
use datafusion_physical_expr::create_first_value_accumulator;
use datafusion_sql::{
parser::{CopyToSource, CopyToStatement, DFParser},
planner::{object_name_to_table_reference, ContextProvider, ParserOptions, SqlToRel},
Expand All @@ -82,6 +85,7 @@ use datafusion_sql::{

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use log::debug;
use parking_lot::RwLock;
use sqlparser::dialect::dialect_from_str;
use url::Url;
Expand Down Expand Up @@ -1457,6 +1461,22 @@ impl SessionState {
datafusion_functions_array::register_all(&mut new_self)
.expect("can not register array expressions");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also panic if register fails here


let first_value = create_first_value(
"FIRST_VALUE",
Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable),
Arc::new(create_first_value_accumulator),
);

match new_self.register_udaf(Arc::new(first_value)) {
Ok(Some(existing_udaf)) => {
debug!("Overwrite existing UDF: {}", existing_udaf.name());
jayzhan211 marked this conversation as resolved.
Show resolved Hide resolved
}
Ok(None) => {}
Err(err) => {
panic!("Failed to register UDF: {}", err);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a large change to change this function to Result<Self>, so I keep it panic for now

jayzhan211 marked this conversation as resolved.
Show resolved Hide resolved
}
}

new_self
}
/// Returns new [`SessionState`] using the provided
Expand Down
55 changes: 36 additions & 19 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,24 +247,20 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
distinct,
args,
filter,
order_by,
order_by: _,
null_treatment: _,
}) => match func_def {
AggregateFunctionDefinition::BuiltIn(..) => {
create_function_physical_name(func_def.name(), *distinct, args)
}
AggregateFunctionDefinition::UDF(fun) => {
// TODO: Add support for filter and order by in AggregateUDF
// TODO: Add support for filter by in AggregateUDF
if filter.is_some() {
return exec_err!(
"aggregate expression with filter is not supported"
);
}
if order_by.is_some() {
return exec_err!(
"aggregate expression with order_by is not supported"
);
}

let names = args
.iter()
.map(|e| create_physical_name(e, false))
Expand Down Expand Up @@ -1667,20 +1663,34 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
)?),
None => None,
};
let order_by = match order_by {
Some(e) => Some(create_physical_sort_exprs(
e,
logical_input_schema,
execution_props,
)?),
None => None,
};

let try_create_physical_sort_expr =
|order_by: &Option<Vec<Expr>>| -> Result<Option<Vec<PhysicalSortExpr>>> {
let physical_sort_exprs = match order_by {
Some(e) => Some(
e.iter()
.map(|expr| {
create_physical_sort_expr(
expr,
logical_input_schema,
execution_props,
)
})
.collect::<Result<Vec<_>>>()?,
jayzhan211 marked this conversation as resolved.
Show resolved Hide resolved
),
None => None,
};
jayzhan211 marked this conversation as resolved.
Show resolved Hide resolved
Ok(physical_sort_exprs)
};

let ignore_nulls = null_treatment
.unwrap_or(sqlparser::ast::NullTreatment::RespectNulls)
== NullTreatment::IgnoreNulls;
let (agg_expr, filter, order_by) = match func_def {
AggregateFunctionDefinition::BuiltIn(fun) => {
let ordering_reqs = order_by.clone().unwrap_or(vec![]);
let physical_sort_exprs = try_create_physical_sort_expr(order_by)?;
let ordering_reqs: Vec<PhysicalSortExpr> =
physical_sort_exprs.clone().unwrap_or(vec![]);
let agg_expr = aggregates::create_aggregate_expr(
fun,
*distinct,
Expand All @@ -1690,16 +1700,23 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
name,
ignore_nulls,
)?;
(agg_expr, filter, order_by)
(agg_expr, filter, physical_sort_exprs)
}
AggregateFunctionDefinition::UDF(fun) => {
let sort_exprs = order_by.clone().unwrap_or(vec![]);
let physical_sort_exprs = try_create_physical_sort_expr(order_by)?;
let ordering_reqs: Vec<PhysicalSortExpr> =
physical_sort_exprs.clone().unwrap_or(vec![]);
let agg_expr = udaf::create_aggregate_expr(
fun,
&args,
&sort_exprs,
&ordering_reqs,
physical_input_schema,
name,
);
(agg_expr?, filter, order_by)
ignore_nulls,
)?;
(agg_expr, filter, physical_sort_exprs)
}
AggregateFunctionDefinition::Name(_) => {
return internal_err!(
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/tests/user_defined/user_defined_aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ use datafusion::{
};
use datafusion_common::{assert_contains, cast::as_primitive_array, exec_err};
use datafusion_expr::{
create_udaf, AggregateUDFImpl, GroupsAccumulator, SimpleAggregateUDF,
create_udaf, function::AccumulatorArgs, AggregateUDFImpl, GroupsAccumulator,
SimpleAggregateUDF,
};
use datafusion_physical_expr::expressions::AvgAccumulator;

Expand Down Expand Up @@ -717,7 +718,7 @@ impl AggregateUDFImpl for TestGroupsAccumulator {
Ok(DataType::UInt64)
}

fn accumulator(&self, _arg: &DataType) -> Result<Box<dyn Accumulator>> {
fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
// should use groups accumulator
panic!("accumulator shouldn't invoke");
}
Expand Down
3 changes: 2 additions & 1 deletion datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,14 +577,15 @@ impl AggregateFunction {
distinct: bool,
filter: Option<Box<Expr>>,
order_by: Option<Vec<Expr>>,
null_treatment: Option<NullTreatment>,
) -> Self {
Self {
func_def: AggregateFunctionDefinition::UDF(udf),
args,
distinct,
filter,
order_by,
null_treatment: None,
null_treatment,
}
}
}
Expand Down
107 changes: 99 additions & 8 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@ use crate::expr::{
AggregateFunction, BinaryExpr, Cast, Exists, GroupingSet, InList, InSubquery,
Placeholder, ScalarFunction, TryCast,
};
use crate::function::PartitionEvaluatorFactory;
use crate::function::{
AccumulatorArgs, AccumulatorFactoryFunction, PartitionEvaluatorFactory,
};
use crate::udaf::format_state_name;
use crate::{
aggregate_function, built_in_function, conditional_expressions::CaseBuilder,
logical_plan::Subquery, AccumulatorFactoryFunction, AggregateUDF,
BuiltinScalarFunction, Expr, LogicalPlan, Operator, ScalarFunctionImplementation,
ScalarUDF, Signature, Volatility,
logical_plan::Subquery, AggregateUDF, BuiltinScalarFunction, Expr, LogicalPlan,
Operator, ScalarFunctionImplementation, ScalarUDF, Signature, Volatility,
};
use crate::{AggregateUDFImpl, ColumnarValue, ScalarUDFImpl, WindowUDF, WindowUDFImpl};
use arrow::datatypes::DataType;
use datafusion_common::{Column, Result};
use arrow::datatypes::{DataType, Field};
use datafusion_common::{internal_err, Column, Result};
use std::any::Any;
use std::fmt::Debug;
use std::ops::Not;
Expand Down Expand Up @@ -719,6 +721,16 @@ pub fn create_udaf(
))
}

/// Creates a new UDAF with a specific signature, state type and return type.
/// The signature and state type must match the `Accumulator's implementation`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might make sense to add a comment explaining this is a temporary solution (i.e. that the idea is we'll pull the function out into its own crate, but for now we need to keep the physical implementation separate

pub fn create_first_value(
name: &str,
signature: Signature,
accumulator: AccumulatorFactoryFunction,
) -> AggregateUDF {
AggregateUDF::from(FirstValue::new(name, signature, accumulator))
}

/// Implements [`AggregateUDFImpl`] for functions that have a single signature and
/// return type.
pub struct SimpleAggregateUDF {
Expand Down Expand Up @@ -796,15 +808,94 @@ impl AggregateUDFImpl for SimpleAggregateUDF {
Ok(self.return_type.clone())
}

fn accumulator(&self, arg: &DataType) -> Result<Box<dyn crate::Accumulator>> {
(self.accumulator)(arg)
fn accumulator(
&self,
acc_args: AccumulatorArgs,
) -> Result<Box<dyn crate::Accumulator>> {
(self.accumulator)(acc_args)
}

fn state_type(&self, _return_type: &DataType) -> Result<Vec<DataType>> {
Ok(self.state_type.clone())
}
}

pub struct FirstValue {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice!

I think we should put this code in a new crate (maybe datafusion-functions-aggregates) to ensure the UDF API is the only way to run such functions. We could do that as a follow on PR though

Copy link
Contributor Author

@jayzhan211 jayzhan211 Mar 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had tried moving it to datafusion-functions-aggregates, but not sure how to avoid importing physical-expr, it is largely used, including PhysicalSortExpr and AggreagteExpr. 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see -- it seems like the issue is that the accumulator implementation requires PhysicalSortExpr.

To pull the code into its own crate maybe we could pull out the relevant pieces of datafusion-physical-expr into datafusion-physical-core or something (as a follow on PR)

name: String,
signature: Signature,
accumulator: AccumulatorFactoryFunction,
}

impl Debug for FirstValue {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("FirstValue")
.field("name", &self.name)
.field("signature", &self.signature)
.field("fun", &"<FUNC>")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.field("fun", &"<FUNC>")
.field("accumulator", &"<FUNC>")

.finish()
}
}

impl FirstValue {
pub fn new(
name: impl Into<String>,
signature: Signature,
accumulator: AccumulatorFactoryFunction,
) -> Self {
let name = name.into();
Self {
name,
signature,
accumulator,
}
}
}

impl AggregateUDFImpl for FirstValue {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
&self.name
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
Ok(arg_types[0].clone())
}

fn accumulator(
&self,
acc_args: AccumulatorArgs,
) -> Result<Box<dyn crate::Accumulator>> {
(self.accumulator)(acc_args)
}

fn state_type(&self, _return_type: &DataType) -> Result<Vec<DataType>> {
internal_err!("FirstValue does not have a state type")
}

fn state_fields(
&self,
name: &str,
value_type: DataType,
ordering_fields: Vec<Field>,
) -> Result<Vec<Field>> {
let mut fields = vec![Field::new(
format_state_name(name, "first_value"),
value_type,
true,
)];
fields.extend(ordering_fields);
fields.push(Field::new("is_set", DataType::Boolean, true));
Ok(fields)
}
}

/// Creates a new UDWF with a specific signature, state type and return type.
///
/// The signature and state type must match the [`PartitionEvaluator`]'s implementation`.
Expand Down
Loading
Loading