Skip to content

Commit

Permalink
Support CREATE OR REPLACE TABLE (#2613)
Browse files Browse the repository at this point in the history
* support create or replace table ...

* 1. add test case for 'create or replace table'. 2. do not allow 'IF NOT EXISTS' coexist with 'REPLACE'

* refine the code format
  • Loading branch information
HuSen8891 authored May 28, 2022
1 parent 7b7edf9 commit df2094f
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 4 deletions.
27 changes: 23 additions & 4 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,15 +290,17 @@ impl SessionContext {
name,
input,
if_not_exists,
or_replace,
}) => {
let table = self.table(name.as_str());

match (if_not_exists, table) {
(true, Ok(_)) => {
match (if_not_exists, or_replace, table) {
(true, false, Ok(_)) => {
let plan = LogicalPlanBuilder::empty(false).build()?;
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
}
(_, Err(_)) => {
(false, true, Ok(_)) => {
self.deregister_table(name.as_str())?;
let plan = self.optimize(&input)?;
let physical =
Arc::new(DataFrame::new(self.state.clone(), &plan));
Expand All @@ -312,7 +314,24 @@ impl SessionContext {
self.register_table(name.as_str(), table)?;
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
}
(false, Ok(_)) => Err(DataFusionError::Execution(format!(
(true, true, Ok(_)) => Err(DataFusionError::Internal(
"'IF NOT EXISTS' cannot coexist with 'REPLACE'".to_string(),
)),
(_, _, Err(_)) => {
let plan = self.optimize(&input)?;
let physical =
Arc::new(DataFrame::new(self.state.clone(), &plan));

let batches: Vec<_> = physical.collect_partitioned().await?;
let table = Arc::new(MemTable::try_new(
Arc::new(plan.schema().as_ref().into()),
batches,
)?);

self.register_table(name.as_str(), table)?;
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
}
(false, false, Ok(_)) => Err(DataFusionError::Execution(format!(
"Table '{:?}' already exists",
name
))),
Expand Down
47 changes: 47 additions & 0 deletions datafusion/core/tests/sql/create_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,53 @@ async fn create_table_as() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn create_or_replace_table_as() -> Result<()> {
// the information schema used to introduce cyclic Arcs
let ctx =
SessionContext::with_config(SessionConfig::new().with_information_schema(true));

// Create table
ctx.sql("CREATE TABLE y AS VALUES (1,2),(3,4)")
.await
.unwrap()
.collect()
.await
.unwrap();

// Replace table
ctx.sql("CREATE OR REPLACE TABLE y AS VALUES (5,6)")
.await
.unwrap()
.collect()
.await
.unwrap();

let sql_all = "SELECT * FROM y";
let results_all = execute_to_batches(&ctx, sql_all).await;

let expected = vec![
"+---------+---------+",
"| column1 | column2 |",
"+---------+---------+",
"| 5 | 6 |",
"+---------+---------+",
];

assert_batches_eq!(expected, &results_all);

// 'IF NOT EXISTS' cannot coexist with 'REPLACE'
let result = ctx
.sql("CREATE OR REPLACE TABLE if not exists y AS VALUES (7,8)")
.await;
assert!(
result.is_err(),
"'IF NOT EXISTS' cannot coexist with 'REPLACE'"
);

Ok(())
}

#[tokio::test]
async fn drop_table() -> Result<()> {
let ctx = SessionContext::new();
Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1063,6 +1063,8 @@ pub struct CreateMemoryTable {
pub input: Arc<LogicalPlan>,
/// Option to not error if table already exists
pub if_not_exists: bool,
/// Option to replace table content if table already exists
pub or_replace: bool,
}

/// Creates a view.
Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,11 +427,13 @@ pub fn from_plan(
LogicalPlan::CreateMemoryTable(CreateMemoryTable {
name,
if_not_exists,
or_replace,
..
}) => Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
input: Arc::new(inputs[0].clone()),
name: name.clone(),
if_not_exists: *if_not_exists,
or_replace: *or_replace,
})),
LogicalPlan::CreateView(CreateView {
name, or_replace, ..
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
table_properties,
with_options,
if_not_exists,
or_replace,
..
} if columns.is_empty()
&& constraints.is_empty()
Expand All @@ -173,6 +174,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
name: name.to_string(),
input: Arc::new(plan),
if_not_exists,
or_replace,
}))
}
Statement::CreateView {
Expand Down

0 comments on commit df2094f

Please sign in to comment.