Skip to content

Commit

Permalink
Merge pull request #462 from splitgraph/copy-to-stmt
Browse files Browse the repository at this point in the history
Wire up COPY TO statement
  • Loading branch information
gruuya authored Sep 7, 2023
2 parents 75a613f + 51ff22a commit bb1ecaa
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 6 deletions.
25 changes: 20 additions & 5 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use datafusion::physical_expr::{create_physical_expr, PhysicalExpr};
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::sql::parser::{CopyToSource, CopyToStatement};
use datafusion::{
arrow::{
datatypes::{Schema, SchemaRef},
Expand Down Expand Up @@ -374,6 +375,7 @@ pub fn is_read_only(plan: &LogicalPlan) -> bool {
| LogicalPlan::Ddl(_)
| LogicalPlan::Analyze(_)
| LogicalPlan::Extension(_)
| LogicalPlan::Copy(_)
)
}

Expand Down Expand Up @@ -1160,14 +1162,20 @@ impl SeafowlContext for DefaultSeafowlContext {
"Unsupported SQL statement: {s:?}"
))),
},
DFStatement::CopyTo(CopyToStatement { ref mut source, .. }) => {
let state = if let CopyToSource::Query(ref mut query) = source {
self.rewrite_time_travel_query(query).await?
} else {
self.inner.state()
};
state.statement_to_plan(stmt).await
}
DFStatement::DescribeTableStmt(_) | DFStatement::CreateExternalTable(_) => {
self.inner.state().statement_to_plan(stmt).await
}
DFStatement::CopyTo(_) | DFStatement::Explain(_) => {
Err(Error::NotImplemented(format!(
"Unsupported SQL statement: {statement:?}"
)))
}
DFStatement::Explain(_) => Err(Error::NotImplemented(format!(
"Unsupported SQL statement: {statement:?}"
))),
}
}

Expand Down Expand Up @@ -1196,6 +1204,13 @@ impl SeafowlContext for DefaultSeafowlContext {
// Similarly to DataFrame::sql, run certain logical plans outside of the actual execution flow
// and produce a dummy physical plan instead
match plan {
LogicalPlan::Copy(_) => {
let physical = self.inner.state().create_physical_plan(plan).await?;

// Eagerly execute the COPY TO plan to align with other DML plans in here.
self.collect(physical).await?;
Ok(make_dummy_exec())
}
// CREATE EXTERNAL TABLE copied from DataFusion's source code
// It uses ListingTable which queries data at a given location using the ObjectStore
// abstraction (URL: scheme://some-path.to.file.parquet) and it's easier to reuse this
Expand Down
69 changes: 68 additions & 1 deletion src/datafusion/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
//! Declares a SQL parser based on sqlparser that handles custom formats that we need.

pub use datafusion::sql::parser::Statement;
use datafusion::sql::parser::{CreateExternalTable, DescribeTableStmt};
use datafusion::sql::parser::{
CopyToSource, CopyToStatement, CreateExternalTable, DescribeTableStmt,
};
use datafusion_common::parsers::CompressionTypeVariant;
use sqlparser::ast::{CreateFunctionBody, Expr, ObjectName, OrderByExpr, Value};
use sqlparser::tokenizer::{TokenWithLocation, Word};
Expand Down Expand Up @@ -144,6 +146,13 @@ impl<'a> DFParser<'a> {
// use custom parsing
self.parse_create()
}
Word {
keyword: Keyword::COPY,
..
} => {
self.parser.next_token();
self.parse_copy()
}
Word {
keyword: Keyword::DESCRIBE,
..
Expand Down Expand Up @@ -211,6 +220,37 @@ impl<'a> DFParser<'a> {
})))
}

/// Parse a SQL `COPY TO` statement
pub fn parse_copy(&mut self) -> Result<Statement, ParserError> {
// parse as a query
let source = if self.parser.consume_token(&Token::LParen) {
let query = self.parser.parse_query()?;
self.parser.expect_token(&Token::RParen)?;
CopyToSource::Query(query)
} else {
// parse as table reference
let table_name = self.parser.parse_object_name()?;
CopyToSource::Relation(table_name)
};

self.parser.expect_keyword(Keyword::TO)?;

let target = self.parser.parse_literal_string()?;

// check for options in parens
let options = if self.parser.peek_token().token == Token::LParen {
self.parse_value_options()?
} else {
vec![]
};

Ok(Statement::CopyTo(CopyToStatement {
source,
target,
options,
}))
}

/// Parse the next token as a key name for an option list
///
/// Note this is different than [`parse_literal_string`]
Expand Down Expand Up @@ -620,6 +660,33 @@ impl<'a> DFParser<'a> {
Ok(options)
}

/// Parses (key value) style options into a map of String --> [`Value`].
///
/// Unlike [`Self::parse_string_options`], this method supports
/// keywords as key names as well as multiple value types such as
/// Numbers as well as Strings.
fn parse_value_options(&mut self) -> Result<Vec<(String, Value)>, ParserError> {
let mut options = vec![];
self.parser.expect_token(&Token::LParen)?;

loop {
let key = self.parse_option_key()?;
let value = self.parse_option_value()?;
options.push((key, value));
let comma = self.parser.consume_token(&Token::Comma);
if self.parser.consume_token(&Token::RParen) {
// allow a trailing comma, even though it's not in standard
break;
} else if !comma {
return self.expected(
"',' or ')' after option definition",
self.parser.peek_token(),
);
}
}
Ok(options)
}

fn parse_delimiter(&mut self) -> Result<char, ParserError> {
let token = self.parser.parse_literal_string()?;
match token.len() {
Expand Down
41 changes: 41 additions & 0 deletions tests/statements/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,3 +529,44 @@ async fn test_update_statement_errors() {
.to_string()
.contains("Cannot cast string 'nope' to value of Int64 type"));
}

#[tokio::test]
async fn test_copy_to_statement() -> Result<(), DataFusionError> {
let (context, _) = make_context_with_pg(ObjectStoreType::InMemory).await;
create_table_and_insert(&context, "test_table").await;

let temp_dir = TempDir::new().unwrap();
let location = format!("{}/copy.parquet", temp_dir.path().to_string_lossy());

// Execute the COPY TO statement
context
.plan_query(format!("COPY test_table TO '{location}'").as_str())
.await?;

// Check results
context
.plan_query(
format!(
"CREATE EXTERNAL TABLE copied_table \
STORED AS PARQUET \
LOCATION '{location}'"
)
.as_str(),
)
.await?;

let results_original = context
.collect(context.plan_query("SELECT * FROM test_table").await?)
.await?;
let results_copied = context
.collect(
context
.plan_query("SELECT * FROM staging.copied_table")
.await?,
)
.await?;

assert_eq!(results_original, results_copied);

Ok(())
}

0 comments on commit bb1ecaa

Please sign in to comment.