diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index 905475eaca7a..77fd5fe44d44 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -60,7 +60,7 @@ use datafusion::{ prelude::{Column, SessionContext}, scalar::ScalarValue, }; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::str::FromStr; use std::sync::Arc; use substrait::proto::exchange_rel::ExchangeKind; @@ -404,22 +404,33 @@ pub async fn from_substrait_rel( let mut input = LogicalPlanBuilder::from( from_substrait_rel(ctx, input, extensions).await?, ); + let mut names: HashSet = HashSet::new(); let mut exprs: Vec = vec![]; for e in &p.expressions { let x = from_substrait_rex(ctx, e, input.clone().schema(), extensions) .await?; // if the expression is WindowFunction, wrap in a Window relation - // before returning and do not add to list of this Projection's expression list - // otherwise, add expression to the Projection's expression list - match &*x { - Expr::WindowFunction(_) => { - input = input.window(vec![x.as_ref().clone()])?; - exprs.push(x.as_ref().clone()); - } - _ => { - exprs.push(x.as_ref().clone()); - } + if let Expr::WindowFunction(_) = x.as_ref() { + // Adding the same expression here and in the project below + // works because the project's builder uses columnize_expr(..) + // to transform it into a column reference + input = input.window(vec![x.as_ref().clone()])? + } + // Ensure the expression has a unique display name, so that project's + // validate_unique_names doesn't fail + let name = x.display_name()?; + let mut new_name = name.clone(); + let mut i = 0; + while names.contains(&new_name) { + new_name = format!("{}__temp__{}", name, i); + i += 1; + } + names.insert(new_name.clone()); + if new_name != name { + exprs.push(x.as_ref().clone().alias(new_name.clone())); + } else { + exprs.push(x.as_ref().clone()); } } input.project(exprs)?.build() diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index 52cfa50683a0..2893b1a31a26 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -751,6 +751,22 @@ async fn roundtrip_values_duplicate_column_join() -> Result<()> { .await } +#[tokio::test] +async fn duplicate_column() -> Result<()> { + // Substrait does not keep column names (aliases) in the plan, rather it operates on column indices + // only. DataFusion however, is strict about not having duplicate column names appear in the plan. + // This test confirms that we generate aliases for columns in the plan which would otherwise have + // colliding names. + assert_expected_plan( + "SELECT a + 1 as sum_a, a + 1 as sum_a_2 FROM data", + "Projection: data.a + Int64(1) AS sum_a, data.a + Int64(1) AS data.a + Int64(1)__temp__0 AS sum_a_2\ + \n Projection: data.a + Int64(1)\ + \n TableScan: data projection=[a]", + true, + ) + .await +} + /// Construct a plan that cast columns. Only those SQL types are supported for now. #[tokio::test] async fn new_test_grammar() -> Result<()> {