Skip to content

Commit

Permalink
fix: When consuming Substrait, temporarily rename clashing duplicate …
Browse files Browse the repository at this point in the history
…columns (apache#11329)

* cleanup project internals

* alias intermediate duplicate columns

* fix test

* fix clippy
  • Loading branch information
Blizzara authored Jul 8, 2024
1 parent 37428bb commit 894a879
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 11 deletions.
33 changes: 22 additions & 11 deletions datafusion/substrait/src/logical_plan/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ use datafusion::{
prelude::{Column, SessionContext},
scalar::ScalarValue,
};
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::sync::Arc;
use substrait::proto::exchange_rel::ExchangeKind;
Expand Down Expand Up @@ -404,22 +404,33 @@ pub async fn from_substrait_rel(
let mut input = LogicalPlanBuilder::from(
from_substrait_rel(ctx, input, extensions).await?,
);
let mut names: HashSet<String> = HashSet::new();
let mut exprs: Vec<Expr> = vec![];
for e in &p.expressions {
let x =
from_substrait_rex(ctx, e, input.clone().schema(), extensions)
.await?;
// if the expression is WindowFunction, wrap in a Window relation
// before returning and do not add to list of this Projection's expression list
// otherwise, add expression to the Projection's expression list
match &*x {
Expr::WindowFunction(_) => {
input = input.window(vec![x.as_ref().clone()])?;
exprs.push(x.as_ref().clone());
}
_ => {
exprs.push(x.as_ref().clone());
}
if let Expr::WindowFunction(_) = x.as_ref() {
// Adding the same expression here and in the project below
// works because the project's builder uses columnize_expr(..)
// to transform it into a column reference
input = input.window(vec![x.as_ref().clone()])?
}
// Ensure the expression has a unique display name, so that project's
// validate_unique_names doesn't fail
let name = x.display_name()?;
let mut new_name = name.clone();
let mut i = 0;
while names.contains(&new_name) {
new_name = format!("{}__temp__{}", name, i);
i += 1;
}
names.insert(new_name.clone());
if new_name != name {
exprs.push(x.as_ref().clone().alias(new_name.clone()));
} else {
exprs.push(x.as_ref().clone());
}
}
input.project(exprs)?.build()
Expand Down
16 changes: 16 additions & 0 deletions datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,22 @@ async fn roundtrip_values_duplicate_column_join() -> Result<()> {
.await
}

#[tokio::test]
async fn duplicate_column() -> Result<()> {
// Substrait does not keep column names (aliases) in the plan, rather it operates on column indices
// only. DataFusion however, is strict about not having duplicate column names appear in the plan.
// This test confirms that we generate aliases for columns in the plan which would otherwise have
// colliding names.
assert_expected_plan(
"SELECT a + 1 as sum_a, a + 1 as sum_a_2 FROM data",
"Projection: data.a + Int64(1) AS sum_a, data.a + Int64(1) AS data.a + Int64(1)__temp__0 AS sum_a_2\
\n Projection: data.a + Int64(1)\
\n TableScan: data projection=[a]",
true,
)
.await
}

/// Construct a plan that cast columns. Only those SQL types are supported for now.
#[tokio::test]
async fn new_test_grammar() -> Result<()> {
Expand Down

0 comments on commit 894a879

Please sign in to comment.