Skip to content

Commit

Permalink
Allow declaring partition columns in PARTITION BY clause, backwards…
Browse files Browse the repository at this point in the history
… compatible (apache#9599)

* Draft allow both syntaxes

* suppress unused code error

* prevent constraints in partition clauses

* fix clippy

* More tests

* comment and prevent constraints on partition columns

* trailing whitespaces

* End-to-End test of new Hive syntax

---------

Co-authored-by: Mohamed Abdeen <mohamed.abdeen@paytabs.com>
  • Loading branch information
2 people authored and Lordworms committed Apr 1, 2024
1 parent cf566d0 commit 07b75d1
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 11 deletions.
59 changes: 55 additions & 4 deletions datafusion/sql/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ pub(crate) type LexOrdering = Vec<OrderByExpr>;
/// [ WITH HEADER ROW ]
/// [ DELIMITER <char> ]
/// [ COMPRESSION TYPE <GZIP | BZIP2 | XZ | ZSTD> ]
/// [ PARTITIONED BY (<column list>) ]
/// [ PARTITIONED BY (<column_definition list> | <column list>) ]
/// [ WITH ORDER (<ordered column list>)
/// [ OPTIONS (<key_value_list>) ]
/// LOCATION <literal>
Expand Down Expand Up @@ -693,7 +693,7 @@ impl<'a> DFParser<'a> {
self.parser
.parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]);
let table_name = self.parser.parse_object_name(true)?;
let (columns, constraints) = self.parse_columns()?;
let (mut columns, constraints) = self.parse_columns()?;

#[derive(Default)]
struct Builder {
Expand Down Expand Up @@ -754,7 +754,30 @@ impl<'a> DFParser<'a> {
Keyword::PARTITIONED => {
self.parser.expect_keyword(Keyword::BY)?;
ensure_not_set(&builder.table_partition_cols, "PARTITIONED BY")?;
builder.table_partition_cols = Some(self.parse_partitions()?);
// Expects either list of column names (col_name [, col_name]*)
// or list of column definitions (col_name datatype [, col_name datatype]* )
// use the token after the name to decide which parsing rule to use
// Note that mixing both names and definitions is not allowed
let peeked = self.parser.peek_nth_token(2);
if peeked == Token::Comma || peeked == Token::RParen {
// list of column names
builder.table_partition_cols = Some(self.parse_partitions()?)
} else {
// list of column defs
let (cols, cons) = self.parse_columns()?;
builder.table_partition_cols = Some(
cols.iter().map(|col| col.name.to_string()).collect(),
);

columns.extend(cols);

if !cons.is_empty() {
return Err(ParserError::ParserError(
"Constraints on Partition Columns are not supported"
.to_string(),
));
}
}
}
Keyword::OPTIONS => {
ensure_not_set(&builder.options, "OPTIONS")?;
Expand Down Expand Up @@ -1167,9 +1190,37 @@ mod tests {
});
expect_parse_ok(sql, expected)?;

// Error cases: partition column does not support type
// positive case: column definiton allowed in 'partition by' clause
let sql =
"CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV PARTITIONED BY (p1 int) LOCATION 'foo.csv'";
let expected = Statement::CreateExternalTable(CreateExternalTable {
name: "t".into(),
columns: vec![
make_column_def("c1", DataType::Int(None)),
make_column_def("p1", DataType::Int(None)),
],
file_type: "CSV".to_string(),
has_header: false,
delimiter: ',',
location: "foo.csv".into(),
table_partition_cols: vec!["p1".to_string()],
order_exprs: vec![],
if_not_exists: false,
file_compression_type: UNCOMPRESSED,
unbounded: false,
options: HashMap::new(),
constraints: vec![],
});
expect_parse_ok(sql, expected)?;

// negative case: mixed column defs and column names in `PARTITIONED BY` clause
let sql =
"CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV PARTITIONED BY (p1 int, c1) LOCATION 'foo.csv'";
expect_parse_error(sql, "sql parser error: Expected a data type name, found: )");

// negative case: mixed column defs and column names in `PARTITIONED BY` clause
let sql =
"CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV PARTITIONED BY (c1, p1 int) LOCATION 'foo.csv'";
expect_parse_error(sql, "sql parser error: Expected ',' or ')' after partition definition, found: int");

// positive case: additional options (one entry) can be specified
Expand Down
10 changes: 9 additions & 1 deletion datafusion/sqllogictest/test_files/create_external_table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,17 @@ CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV WITH LOCATION 'foo.csv';
statement error DataFusion error: SQL error: ParserError\("Unexpected token FOOBAR"\)
CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV FOOBAR BARBAR BARFOO LOCATION 'foo.csv';

# Missing partition column
statement error DataFusion error: Arrow error: Schema error: Unable to get field named "c2". Valid fields: \["c1"\]
create EXTERNAL TABLE t(c1 int) STORED AS CSV PARTITIONED BY (c2) LOCATION 'foo.csv'

# Duplicate Column in `PARTITIONED BY` clause
statement error DataFusion error: Schema error: Schema contains duplicate unqualified field name c1
create EXTERNAL TABLE t(c1 int, c2 int) STORED AS CSV PARTITIONED BY (c1 int) LOCATION 'foo.csv'

# Conflicting options
statement error DataFusion error: Invalid or Unsupported Configuration: Config value "column_index_truncate_length" not found on CsvOptions
CREATE EXTERNAL TABLE csv_table (column1 int)
STORED AS CSV
LOCATION 'foo.csv'
OPTIONS ('format.delimiter' ';', 'format.column_index_truncate_length' '123')
OPTIONS ('format.delimiter' ';', 'format.column_index_truncate_length' '123')
36 changes: 30 additions & 6 deletions datafusion/sqllogictest/test_files/insert_to_external.slt
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ LOCATION '../../testing/data/csv/aggregate_test_100.csv'


statement ok
create table dictionary_encoded_values as values
create table dictionary_encoded_values as values
('a', arrow_cast('foo', 'Dictionary(Int32, Utf8)')), ('b', arrow_cast('bar', 'Dictionary(Int32, Utf8)'));

query TTT
Expand All @@ -55,13 +55,13 @@ statement ok
CREATE EXTERNAL TABLE dictionary_encoded_parquet_partitioned(
a varchar,
b varchar,
)
)
STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/parquet_types_partitioned/'
PARTITIONED BY (b);

query TT
insert into dictionary_encoded_parquet_partitioned
insert into dictionary_encoded_parquet_partitioned
select * from dictionary_encoded_values
----
2
Expand All @@ -76,21 +76,21 @@ statement ok
CREATE EXTERNAL TABLE dictionary_encoded_arrow_partitioned(
a varchar,
b varchar,
)
)
STORED AS arrow
LOCATION 'test_files/scratch/insert_to_external/arrow_dict_partitioned/'
PARTITIONED BY (b);

query TT
insert into dictionary_encoded_arrow_partitioned
insert into dictionary_encoded_arrow_partitioned
select * from dictionary_encoded_values
----
2

statement ok
CREATE EXTERNAL TABLE dictionary_encoded_arrow_test_readback(
a varchar,
)
)
STORED AS arrow
LOCATION 'test_files/scratch/insert_to_external/arrow_dict_partitioned/b=bar/';

Expand Down Expand Up @@ -185,6 +185,30 @@ select * from partitioned_insert_test_verify;
1
2

statement ok
CREATE EXTERNAL TABLE
partitioned_insert_test_hive(c bigint)
STORED AS csv
LOCATION 'test_files/scratch/insert_to_external/insert_to_partitioned'
PARTITIONED BY (a string, b string);

query ITT
INSERT INTO partitioned_insert_test_hive VALUES (3,30,300);
----
1

query ITT
SELECT * FROM partitioned_insert_test_hive order by a,b,c;
----
1 10 100
1 10 200
1 20 100
2 20 100
1 20 200
2 20 200
3 30 300


statement ok
CREATE EXTERNAL TABLE
partitioned_insert_test_json(a string, b string)
Expand Down

0 comments on commit 07b75d1

Please sign in to comment.