diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index e6bfc67eda81..030536f9f830 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -34,6 +34,7 @@ use datafusion::logical_expr::{ ExprSchemable, LogicalPlan, Operator, Projection, SortExpr, Values, }; use substrait::proto::expression::subquery::set_predicate::PredicateOp; +use substrait::proto::expression_reference::ExprType; use url::Url; use crate::extensions::Extensions; @@ -96,7 +97,7 @@ use substrait::proto::{ sort_field::{SortDirection, SortKind::*}, AggregateFunction, Expression, NamedStruct, Plan, Rel, Type, }; -use substrait::proto::{FunctionArgument, SortField}; +use substrait::proto::{ExtendedExpression, FunctionArgument, SortField}; // Substrait PrecisionTimestampTz indicates that the timestamp is relative to UTC, which // is the same as the expectation for any non-empty timezone in DF, so any non-empty timezone @@ -251,6 +252,81 @@ pub async fn from_substrait_plan( } } +/// An ExprContainer is a container for a collection of expressions with a common input schema +/// +/// In addition, each expression is associated with a field, which defines the +/// expression's output. The data type and nullability of the field are calculated from the +/// expression and the input schema. However the names of the field (and its nested fields) are +/// derived from the Substrait message. +pub struct ExprContainer { + /// The input schema for the expressions + pub input_schema: DFSchemaRef, + /// The expressions + /// + /// Each item contains an expression and the field that defines the expected nullability and name of the expr's output + pub exprs: Vec<(Expr, Field)>, +} + +/// Convert Substrait ExtendedExpression to ExprContainer +/// +/// A Substrait ExtendedExpression message contains one or more expressions, +/// with names for the outputs, and an input schema. These pieces are all included +/// in the ExprContainer. +/// +/// This is a top-level message and can be used to send expressions (not plans) +/// between systems. This is often useful for scenarios like pushdown where filter +/// expressions need to be sent to remote systems. +pub async fn from_substrait_extended_expr( + ctx: &SessionContext, + extended_expr: &ExtendedExpression, +) -> Result { + // Register function extension + let extensions = Extensions::try_from(&extended_expr.extensions)?; + if !extensions.type_variations.is_empty() { + return not_impl_err!("Type variation extensions are not supported"); + } + + let input_schema = DFSchemaRef::new(match &extended_expr.base_schema { + Some(base_schema) => from_substrait_named_struct(base_schema, &extensions), + None => { + plan_err!("required property `base_schema` missing from Substrait ExtendedExpression message") + } + }?); + + // Parse expressions + let mut exprs = Vec::with_capacity(extended_expr.referred_expr.len()); + for (expr_idx, substrait_expr) in extended_expr.referred_expr.iter().enumerate() { + let scalar_expr = match &substrait_expr.expr_type { + Some(ExprType::Expression(scalar_expr)) => Ok(scalar_expr), + Some(ExprType::Measure(_)) => { + not_impl_err!("Measure expressions are not yet supported") + } + None => { + plan_err!("required property `expr_type` missing from Substrait ExpressionReference message") + } + }?; + let expr = + from_substrait_rex(ctx, scalar_expr, &input_schema, &extensions).await?; + let (output_type, expected_nullability) = + expr.data_type_and_nullable(&input_schema)?; + let output_field = Field::new("", output_type, expected_nullability); + let mut names_idx = 0; + let output_field = rename_field( + &output_field, + &substrait_expr.output_names, + expr_idx, + &mut names_idx, + /*rename_self=*/ true, + )?; + exprs.push((expr, output_field)); + } + + Ok(ExprContainer { + input_schema, + exprs, + }) +} + /// parse projection pub fn extract_projection( t: LogicalPlan, @@ -334,6 +410,68 @@ fn rename_expressions( .collect() } +fn rename_field( + field: &Field, + dfs_names: &Vec, + unnamed_field_suffix: usize, // If Substrait doesn't provide a name, we'll use this "c{unnamed_field_suffix}" + name_idx: &mut usize, // Index into dfs_names + rename_self: bool, // Some fields (e.g. list items) don't have names in Substrait and this will be false to keep old name +) -> Result { + let name = if rename_self { + next_struct_field_name(unnamed_field_suffix, dfs_names, name_idx)? + } else { + field.name().to_string() + }; + match field.data_type() { + DataType::Struct(children) => { + let children = children + .iter() + .enumerate() + .map(|(child_idx, f)| { + rename_field( + f.as_ref(), + dfs_names, + child_idx, + name_idx, + /*rename_self=*/ true, + ) + }) + .collect::>()?; + Ok(field + .to_owned() + .with_name(name) + .with_data_type(DataType::Struct(children))) + } + DataType::List(inner) => { + let renamed_inner = rename_field( + inner.as_ref(), + dfs_names, + 0, + name_idx, + /*rename_self=*/ false, + )?; + Ok(field + .to_owned() + .with_data_type(DataType::List(FieldRef::new(renamed_inner))) + .with_name(name)) + } + DataType::LargeList(inner) => { + let renamed_inner = rename_field( + inner.as_ref(), + dfs_names, + 0, + name_idx, + /*rename_self= */ false, + )?; + Ok(field + .to_owned() + .with_data_type(DataType::LargeList(FieldRef::new(renamed_inner))) + .with_name(name)) + } + _ => Ok(field.to_owned().with_name(name)), + } +} + /// Produce a version of the given schema with names matching the given list of names. /// Substrait doesn't deal with column (incl. nested struct field) names within the schema, /// but it does give us the list of expected names at the end of the plan, so we use this @@ -342,59 +480,20 @@ fn make_renamed_schema( schema: &DFSchemaRef, dfs_names: &Vec, ) -> Result { - fn rename_inner_fields( - dtype: &DataType, - dfs_names: &Vec, - name_idx: &mut usize, - ) -> Result { - match dtype { - DataType::Struct(fields) => { - let fields = fields - .iter() - .map(|f| { - let name = next_struct_field_name(0, dfs_names, name_idx)?; - Ok((**f).to_owned().with_name(name).with_data_type( - rename_inner_fields(f.data_type(), dfs_names, name_idx)?, - )) - }) - .collect::>()?; - Ok(DataType::Struct(fields)) - } - DataType::List(inner) => Ok(DataType::List(FieldRef::new( - (**inner).to_owned().with_data_type(rename_inner_fields( - inner.data_type(), - dfs_names, - name_idx, - )?), - ))), - DataType::LargeList(inner) => Ok(DataType::LargeList(FieldRef::new( - (**inner).to_owned().with_data_type(rename_inner_fields( - inner.data_type(), - dfs_names, - name_idx, - )?), - ))), - _ => Ok(dtype.to_owned()), - } - } - let mut name_idx = 0; let (qualifiers, fields): (_, Vec) = schema .iter() - .map(|(q, f)| { - let name = next_struct_field_name(0, dfs_names, &mut name_idx)?; - Ok(( - q.cloned(), - (**f) - .to_owned() - .with_name(name) - .with_data_type(rename_inner_fields( - f.data_type(), - dfs_names, - &mut name_idx, - )?), - )) + .enumerate() + .map(|(field_idx, (q, f))| { + let renamed_f = rename_field( + f.as_ref(), + dfs_names, + field_idx, + &mut name_idx, + /*rename_self=*/ true, + )?; + Ok((q.cloned(), renamed_f)) }) .collect::>>()? .into_iter() @@ -1681,14 +1780,14 @@ fn from_substrait_struct_type( } fn next_struct_field_name( - i: usize, + column_idx: usize, dfs_names: &[String], name_idx: &mut usize, ) -> Result { if dfs_names.is_empty() { // If names are not given, create dummy names // c0, c1, ... align with e.g. SqlToRel::create_named_struct - Ok(format!("c{i}")) + Ok(format!("c{column_idx}")) } else { let name = dfs_names.get(*name_idx).cloned().ok_or_else(|| { substrait_datafusion_err!("Named schema must contain names for all fields") diff --git a/datafusion/substrait/src/logical_plan/producer.rs b/datafusion/substrait/src/logical_plan/producer.rs index fada827875b0..1165ce13d236 100644 --- a/datafusion/substrait/src/logical_plan/producer.rs +++ b/datafusion/substrait/src/logical_plan/producer.rs @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. -use itertools::Itertools; use std::sync::Arc; +use substrait::proto::expression_reference::ExprType; use arrow_buffer::ToByteSlice; -use datafusion::arrow::datatypes::IntervalUnit; +use datafusion::arrow::datatypes::{Field, IntervalUnit}; use datafusion::logical_expr::{ CrossJoin, Distinct, Like, Partitioning, WindowFrameUnits, }; @@ -63,7 +63,9 @@ use substrait::proto::expression::window_function::BoundsType; use substrait::proto::read_rel::VirtualTable; use substrait::proto::rel_common::EmitKind; use substrait::proto::rel_common::EmitKind::Emit; -use substrait::proto::{rel_common, CrossRel, ExchangeRel, RelCommon}; +use substrait::proto::{ + rel_common, CrossRel, ExchangeRel, ExpressionReference, ExtendedExpression, RelCommon, +}; use substrait::{ proto::{ aggregate_function::AggregationInvocation, @@ -119,6 +121,56 @@ pub fn to_substrait_plan(plan: &LogicalPlan, ctx: &SessionContext) -> Result Result> { + let mut extensions = Extensions::default(); + + let substrait_exprs = exprs + .iter() + .map(|(expr, field)| { + let substrait_expr = to_substrait_rex( + ctx, + expr, + schema, + /*col_ref_offset=*/ 0, + &mut extensions, + )?; + let mut output_names = Vec::new(); + flatten_names(field, false, &mut output_names)?; + Ok(ExpressionReference { + output_names, + expr_type: Some(ExprType::Expression(substrait_expr)), + }) + }) + .collect::>>()?; + let substrait_schema = to_substrait_named_struct(schema, &mut extensions)?; + + Ok(Box::new(ExtendedExpression { + advanced_extensions: None, + expected_type_urls: vec![], + extension_uris: vec![], + extensions: extensions.into(), + version: Some(version::version_with_producer("datafusion")), + referred_expr: substrait_exprs, + base_schema: Some(substrait_schema), + })) +} + /// Convert DataFusion LogicalPlan to Substrait Rel pub fn to_substrait_rel( plan: &LogicalPlan, @@ -580,50 +632,43 @@ fn create_project_remapping(expr_count: usize, input_field_count: usize) -> Emit Emit(rel_common::Emit { output_mapping }) } +// Substrait wants a list of all field names, including nested fields from structs, +// also from within e.g. lists and maps. However, it does not want the list and map field names +// themselves - only proper structs fields are considered to have useful names. +fn flatten_names(field: &Field, skip_self: bool, names: &mut Vec) -> Result<()> { + if !skip_self { + names.push(field.name().to_string()); + } + match field.data_type() { + DataType::Struct(fields) => { + for field in fields { + flatten_names(field, false, names)?; + } + Ok(()) + } + DataType::List(l) => flatten_names(l, true, names), + DataType::LargeList(l) => flatten_names(l, true, names), + DataType::Map(m, _) => match m.data_type() { + DataType::Struct(key_and_value) if key_and_value.len() == 2 => { + flatten_names(&key_and_value[0], true, names)?; + flatten_names(&key_and_value[1], true, names) + } + _ => plan_err!("Map fields must contain a Struct with exactly 2 fields"), + }, + _ => Ok(()), + }?; + Ok(()) +} + fn to_substrait_named_struct( schema: &DFSchemaRef, extensions: &mut Extensions, ) -> Result { - // Substrait wants a list of all field names, including nested fields from structs, - // also from within e.g. lists and maps. However, it does not want the list and map field names - // themselves - only proper structs fields are considered to have useful names. - fn names_dfs(dtype: &DataType) -> Result> { - match dtype { - DataType::Struct(fields) => { - let mut names = Vec::new(); - for field in fields { - names.push(field.name().to_string()); - names.extend(names_dfs(field.data_type())?); - } - Ok(names) - } - DataType::List(l) => names_dfs(l.data_type()), - DataType::LargeList(l) => names_dfs(l.data_type()), - DataType::Map(m, _) => match m.data_type() { - DataType::Struct(key_and_value) if key_and_value.len() == 2 => { - let key_names = - names_dfs(key_and_value.first().unwrap().data_type())?; - let value_names = - names_dfs(key_and_value.last().unwrap().data_type())?; - Ok([key_names, value_names].concat()) - } - _ => plan_err!("Map fields must contain a Struct with exactly 2 fields"), - }, - _ => Ok(Vec::new()), - } + let mut names = Vec::with_capacity(schema.fields().len()); + for field in schema.fields() { + flatten_names(field, false, &mut names)?; } - let names = schema - .fields() - .iter() - .map(|f| { - let mut names = vec![f.name().to_string()]; - names.extend(names_dfs(f.data_type())?); - Ok(names) - }) - .flatten_ok() - .collect::>()?; - let field_types = r#type::Struct { types: schema .fields() @@ -2178,14 +2223,16 @@ fn substrait_field_ref(index: usize) -> Result { mod test { use super::*; use crate::logical_plan::consumer::{ - from_substrait_literal_without_names, from_substrait_type_without_names, + from_substrait_extended_expr, from_substrait_literal_without_names, + from_substrait_named_struct, from_substrait_type_without_names, }; use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano}; use datafusion::arrow::array::{ GenericListArray, Int64Builder, MapBuilder, StringBuilder, }; - use datafusion::arrow::datatypes::Field; + use datafusion::arrow::datatypes::{Field, Fields, Schema}; use datafusion::common::scalar::ScalarStructBuilder; + use datafusion::common::DFSchema; use std::collections::HashMap; #[test] @@ -2461,4 +2508,101 @@ mod test { Ok(()) } + + #[test] + fn named_struct_names() -> Result<()> { + let mut extensions = Extensions::default(); + let schema = DFSchemaRef::new(DFSchema::try_from(Schema::new(vec![ + Field::new("int", DataType::Int32, true), + Field::new( + "struct", + DataType::Struct(Fields::from(vec![Field::new( + "inner", + DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))), + true, + )])), + true, + ), + Field::new("trailer", DataType::Float64, true), + ]))?); + + let named_struct = to_substrait_named_struct(&schema, &mut extensions)?; + + // Struct field names should be flattened DFS style + // List field names should be omitted + assert_eq!( + named_struct.names, + vec!["int", "struct", "inner", "trailer"] + ); + + let roundtrip_schema = from_substrait_named_struct(&named_struct, &extensions)?; + assert_eq!(schema.as_ref(), &roundtrip_schema); + Ok(()) + } + + #[tokio::test] + async fn extended_expressions() -> Result<()> { + let ctx = SessionContext::new(); + + // One expression, empty input schema + let expr = Expr::Literal(ScalarValue::Int32(Some(42))); + let field = Field::new("out", DataType::Int32, false); + let empty_schema = DFSchemaRef::new(DFSchema::empty()); + let substrait = + to_substrait_extended_expr(&[(&expr, &field)], &empty_schema, &ctx)?; + let roundtrip_expr = from_substrait_extended_expr(&ctx, &substrait).await?; + + assert_eq!(roundtrip_expr.input_schema, empty_schema); + assert_eq!(roundtrip_expr.exprs.len(), 1); + + let (rt_expr, rt_field) = roundtrip_expr.exprs.first().unwrap(); + assert_eq!(rt_field, &field); + assert_eq!(rt_expr, &expr); + + // Multiple expressions, with column references + let expr1 = Expr::Column("c0".into()); + let expr2 = Expr::Column("c1".into()); + let out1 = Field::new("out1", DataType::Int32, true); + let out2 = Field::new("out2", DataType::Utf8, true); + let input_schema = DFSchemaRef::new(DFSchema::try_from(Schema::new(vec![ + Field::new("c0", DataType::Int32, true), + Field::new("c1", DataType::Utf8, true), + ]))?); + + let substrait = to_substrait_extended_expr( + &[(&expr1, &out1), (&expr2, &out2)], + &input_schema, + &ctx, + )?; + let roundtrip_expr = from_substrait_extended_expr(&ctx, &substrait).await?; + + assert_eq!(roundtrip_expr.input_schema, input_schema); + assert_eq!(roundtrip_expr.exprs.len(), 2); + + let mut exprs = roundtrip_expr.exprs.into_iter(); + + let (rt_expr, rt_field) = exprs.next().unwrap(); + assert_eq!(rt_field, out1); + assert_eq!(rt_expr, expr1); + + let (rt_expr, rt_field) = exprs.next().unwrap(); + assert_eq!(rt_field, out2); + assert_eq!(rt_expr, expr2); + + Ok(()) + } + + #[tokio::test] + async fn invalid_extended_expression() { + let ctx = SessionContext::new(); + + // Not ok if input schema is missing field referenced by expr + let expr = Expr::Column("missing".into()); + let field = Field::new("out", DataType::Int32, false); + let empty_schema = DFSchemaRef::new(DFSchema::empty()); + + let err = to_substrait_extended_expr(&[(&expr, &field)], &empty_schema, &ctx); + + assert!(matches!(err, Err(DataFusionError::SchemaError(_, _)))); + } }