Skip to content

Commit

Permalink
fix: preserve column qualifier for DataFrame::with_column (apache#7792
Browse files Browse the repository at this point in the history
)

* fix: preserve column qualifier for `DataFrame::with_column`

* fix test variable

* review feedback: add self join test
  • Loading branch information
jonahgao authored and devinjdangelo committed Oct 11, 2023
1 parent dceeb98 commit 257a22e
Showing 1 changed file with 126 additions and 4 deletions.
130 changes: 126 additions & 4 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1144,10 +1144,7 @@ impl DataFrame {
col_exists = true;
new_column.clone()
} else {
Expr::Column(Column {
relation: None,
name: f.name().into(),
})
col(f.qualified_column())
}
})
.collect();
Expand Down Expand Up @@ -1855,6 +1852,131 @@ mod tests {
Ok(())
}

// Test issue: https://github.com/apache/arrow-datafusion/issues/7790
// The join operation outputs two identical column names, but they belong to different relations.
#[tokio::test]
async fn with_column_join_same_columns() -> Result<()> {
let df = test_table().await?.select_columns(&["c1"])?;
let ctx = SessionContext::new();

let table = df.into_view();
ctx.register_table("t1", table.clone())?;
ctx.register_table("t2", table)?;
let df = ctx
.table("t1")
.await?
.join(
ctx.table("t2").await?,
JoinType::Inner,
&["c1"],
&["c1"],
None,
)?
.sort(vec![
// make the test deterministic
col("t1.c1").sort(true, true),
])?
.limit(0, Some(1))?;

let df_results = df.clone().collect().await?;
assert_batches_sorted_eq!(
[
"+----+----+",
"| c1 | c1 |",
"+----+----+",
"| a | a |",
"+----+----+",
],
&df_results
);

let df_with_column = df.clone().with_column("new_column", lit(true))?;

assert_eq!(
"\
Projection: t1.c1, t2.c1, Boolean(true) AS new_column\
\n Limit: skip=0, fetch=1\
\n Sort: t1.c1 ASC NULLS FIRST\
\n Inner Join: t1.c1 = t2.c1\
\n TableScan: t1\
\n TableScan: t2",
format!("{:?}", df_with_column.logical_plan())
);

assert_eq!(
"\
Projection: t1.c1, t2.c1, Boolean(true) AS new_column\
\n Limit: skip=0, fetch=1\
\n Sort: t1.c1 ASC NULLS FIRST, fetch=1\
\n Inner Join: t1.c1 = t2.c1\
\n SubqueryAlias: t1\
\n TableScan: aggregate_test_100 projection=[c1]\
\n SubqueryAlias: t2\
\n TableScan: aggregate_test_100 projection=[c1]",
format!("{:?}", df_with_column.clone().into_optimized_plan()?)
);

let df_results = df_with_column.collect().await?;

assert_batches_sorted_eq!(
[
"+----+----+------------+",
"| c1 | c1 | new_column |",
"+----+----+------------+",
"| a | a | true |",
"+----+----+------------+",
],
&df_results
);
Ok(())
}

// Table 't1' self join
// Supplementary test of issue: https://github.com/apache/arrow-datafusion/issues/7790
#[tokio::test]
async fn with_column_self_join() -> Result<()> {
let df = test_table().await?.select_columns(&["c1"])?;
let ctx = SessionContext::new();

ctx.register_table("t1", df.into_view())?;

let df = ctx
.table("t1")
.await?
.join(
ctx.table("t1").await?,
JoinType::Inner,
&["c1"],
&["c1"],
None,
)?
.sort(vec![
// make the test deterministic
col("t1.c1").sort(true, true),
])?
.limit(0, Some(1))?;

let df_results = df.clone().collect().await?;
assert_batches_sorted_eq!(
[
"+----+----+",
"| c1 | c1 |",
"+----+----+",
"| a | a |",
"+----+----+",
],
&df_results
);

let actual_err = df.clone().with_column("new_column", lit(true)).unwrap_err();
let expected_err = "Error during planning: Projections require unique expression names \
but the expression \"t1.c1\" at position 0 and \"t1.c1\" at position 1 have the same name. \
Consider aliasing (\"AS\") one of them.";
assert_eq!(actual_err.strip_backtrace(), expected_err);

Ok(())
}

#[tokio::test]
async fn with_column_renamed() -> Result<()> {
let df = test_table()
Expand Down

0 comments on commit 257a22e

Please sign in to comment.