Skip to content

Commit

Permalink
[SPARK-27857][SQL] Move ALTER TABLE parsing into Catalyst
Browse files Browse the repository at this point in the history
This moves parsing logic for `ALTER TABLE` into Catalyst and adds parsed logical plans for alter table changes that use multi-part identifiers. This PR is similar to SPARK-27108, PR apache#24029, that created parsed logical plans for create and CTAS.

* Create parsed logical plans
* Move parsing logic into Catalyst's AstBuilder
* Convert to DataSource plans in DataSourceResolution
* Parse `ALTER TABLE ... SET LOCATION ...` separately from the partition variant
* Parse `ALTER TABLE ... ALTER COLUMN ... [TYPE dataType] [COMMENT comment]` [as discussed on the dev list](http://apache-spark-developers-list.1001551.n3.nabble.com/DISCUSS-Syntax-for-table-DDL-td25197.html#a25270)
* Parse `ALTER TABLE ... RENAME COLUMN ... TO ...`
* Parse `ALTER TABLE ... DROP COLUMNS ...`

* Added new tests in Catalyst's `DDLParserSuite`
* Moved converted plan tests from SQL `DDLParserSuite` to `PlanResolutionSuite`
* Existing tests for regressions

Closes apache#24723 from rdblue/SPARK-27857-add-alter-table-statements-in-catalyst.

Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
  • Loading branch information
rdblue authored and mccheah committed Jun 6, 2019
1 parent d9e0cca commit e1365ba
Show file tree
Hide file tree
Showing 11 changed files with 701 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,27 @@ statement
LIKE source=tableIdentifier locationSpec? #createTableLike
| ANALYZE TABLE tableIdentifier partitionSpec? COMPUTE STATISTICS
(identifier | FOR COLUMNS identifierSeq | FOR ALL COLUMNS)? #analyze
| ALTER TABLE tableIdentifier
ADD COLUMNS '(' columns=colTypeList ')' #addTableColumns
| ALTER TABLE multipartIdentifier
ADD (COLUMN | COLUMNS)
columns=qualifiedColTypeWithPositionList #addTableColumns
| ALTER TABLE multipartIdentifier
ADD (COLUMN | COLUMNS)
'(' columns=qualifiedColTypeWithPositionList ')' #addTableColumns
| ALTER TABLE multipartIdentifier
RENAME COLUMN from=qualifiedName TO to=identifier #renameTableColumn
| ALTER TABLE multipartIdentifier
DROP (COLUMN | COLUMNS) '(' columns=qualifiedNameList ')' #dropTableColumns
| ALTER TABLE multipartIdentifier
DROP (COLUMN | COLUMNS) columns=qualifiedNameList #dropTableColumns
| ALTER (TABLE | VIEW) from=tableIdentifier
RENAME TO to=tableIdentifier #renameTable
| ALTER (TABLE | VIEW) tableIdentifier
| ALTER (TABLE | VIEW) multipartIdentifier
SET TBLPROPERTIES tablePropertyList #setTableProperties
| ALTER (TABLE | VIEW) tableIdentifier
| ALTER (TABLE | VIEW) multipartIdentifier
UNSET TBLPROPERTIES (IF EXISTS)? tablePropertyList #unsetTableProperties
| ALTER TABLE multipartIdentifier
(ALTER | CHANGE) COLUMN? qualifiedName
(TYPE dataType)? (COMMENT comment=STRING)? colPosition? #alterTableColumn
| ALTER TABLE tableIdentifier partitionSpec?
CHANGE COLUMN? identifier colType colPosition? #changeColumn
| ALTER TABLE tableIdentifier (partitionSpec)?
Expand All @@ -134,7 +147,8 @@ statement
DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* PURGE? #dropTablePartitions
| ALTER VIEW tableIdentifier
DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* #dropTablePartitions
| ALTER TABLE tableIdentifier partitionSpec? SET locationSpec #setTableLocation
| ALTER TABLE multipartIdentifier SET locationSpec #setTableLocation
| ALTER TABLE tableIdentifier partitionSpec SET locationSpec #setPartitionLocation
| ALTER TABLE tableIdentifier RECOVER PARTITIONS #recoverPartitions
| DROP TABLE (IF EXISTS)? multipartIdentifier PURGE? #dropTable
| DROP VIEW (IF EXISTS)? multipartIdentifier #dropView
Expand Down Expand Up @@ -690,7 +704,7 @@ intervalValue
;

colPosition
: FIRST | AFTER identifier
: FIRST | AFTER qualifiedName
;

dataType
Expand All @@ -700,6 +714,14 @@ dataType
| identifier ('(' INTEGER_VALUE (',' INTEGER_VALUE)* ')')? #primitiveDataType
;

qualifiedColTypeWithPositionList
: qualifiedColTypeWithPosition (',' qualifiedColTypeWithPosition)*
;

qualifiedColTypeWithPosition
: name=qualifiedName dataType (COMMENT comment=STRING)? colPosition?
;

colTypeList
: colType (',' colType)*
;
Expand Down Expand Up @@ -752,6 +774,10 @@ frameBound
| expression boundType=(PRECEDING | FOLLOWING)
;

qualifiedNameList
: qualifiedName (',' qualifiedName)*
;

qualifiedName
: identifier ('.' identifier)*
;
Expand Down Expand Up @@ -1253,6 +1279,7 @@ nonReserved
| TRANSFORM
| TRUE
| TRUNCATE
| TYPE
| UNARCHIVE
| UNBOUNDED
| UNCACHE
Expand Down Expand Up @@ -1379,6 +1406,7 @@ RESET: 'RESET';
DATA: 'DATA';
START: 'START';
TRANSACTION: 'TRANSACTION';
<<<<<<< HEAD
COMMIT: 'COMMIT';
ROLLBACK: 'ROLLBACK';
MACRO: 'MACRO';
Expand All @@ -1390,6 +1418,64 @@ TRAILING: 'TRAILING';
IF: 'IF';
POSITION: 'POSITION';
EXTRACT: 'EXTRACT';
||||||| parent of 5d6758c0e7... [SPARK-27857][SQL] Move ALTER TABLE parsing into Catalyst
TRANSACTIONS: 'TRANSACTIONS';
TRANSFORM: 'TRANSFORM';
TRUE: 'TRUE';
TRUNCATE: 'TRUNCATE';
UNARCHIVE: 'UNARCHIVE';
UNBOUNDED: 'UNBOUNDED';
UNCACHE: 'UNCACHE';
UNION: 'UNION';
UNIQUE: 'UNIQUE';
UNLOCK: 'UNLOCK';
UNSET: 'UNSET';
USE: 'USE';
USER: 'USER';
USING: 'USING';
VALUES: 'VALUES';
VIEW: 'VIEW';
WEEK: 'WEEK';
WEEKS: 'WEEKS';
WHEN: 'WHEN';
WHERE: 'WHERE';
WINDOW: 'WINDOW';
WITH: 'WITH';
YEAR: 'YEAR';
YEARS: 'YEARS';
//============================
// End of the keywords list
//============================
=======
TRANSACTIONS: 'TRANSACTIONS';
TRANSFORM: 'TRANSFORM';
TRUE: 'TRUE';
TRUNCATE: 'TRUNCATE';
TYPE: 'TYPE';
UNARCHIVE: 'UNARCHIVE';
UNBOUNDED: 'UNBOUNDED';
UNCACHE: 'UNCACHE';
UNION: 'UNION';
UNIQUE: 'UNIQUE';
UNLOCK: 'UNLOCK';
UNSET: 'UNSET';
USE: 'USE';
USER: 'USER';
USING: 'USING';
VALUES: 'VALUES';
VIEW: 'VIEW';
WEEK: 'WEEK';
WEEKS: 'WEEKS';
WHEN: 'WHEN';
WHERE: 'WHERE';
WINDOW: 'WINDOW';
WITH: 'WITH';
YEAR: 'YEAR';
YEARS: 'YEARS';
//============================
// End of the keywords list
//============================
>>>>>>> 5d6758c0e7... [SPARK-27857][SQL] Move ALTER TABLE parsing into Catalyst

EQ : '=' | '==';
NSEQ: '<=>';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, QualifiedColType}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -1991,6 +1991,13 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
(multipartIdentifier, temporary, ifNotExists, ctx.EXTERNAL != null)
}

/**
* Parse a qualified name to a multipart name.
*/
override def visitQualifiedName(ctx: QualifiedNameContext): Seq[String] = withOrigin(ctx) {
ctx.identifier.asScala.map(_.getText)
}

/**
* Parse a list of transforms.
*/
Expand Down Expand Up @@ -2023,8 +2030,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging

ctx.transforms.asScala.map {
case identityCtx: IdentityTransformContext =>
IdentityTransform(FieldReference(
identityCtx.qualifiedName.identifier.asScala.map(_.getText)))
IdentityTransform(FieldReference(typedVisit[Seq[String]](identityCtx.qualifiedName)))

case applyCtx: ApplyTransformContext =>
val arguments = applyCtx.argument.asScala.map(visitTransformArgument)
Expand Down Expand Up @@ -2071,7 +2077,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
override def visitTransformArgument(ctx: TransformArgumentContext): v2.expressions.Expression = {
withOrigin(ctx) {
val reference = Option(ctx.qualifiedName)
.map(nameCtx => FieldReference(nameCtx.identifier.asScala.map(_.getText)))
.map(typedVisit[Seq[String]])
.map(FieldReference(_))
val literal = Option(ctx.constant)
.map(typedVisit[Literal])
.map(lit => LiteralValue(lit.value, lit.dataType))
Expand Down Expand Up @@ -2169,4 +2176,151 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
visitMultipartIdentifier(ctx.multipartIdentifier()),
ctx.EXISTS != null)
}

/**
* Parse new column info from ADD COLUMN into a QualifiedColType.
*/
override def visitQualifiedColTypeWithPosition(
ctx: QualifiedColTypeWithPositionContext): QualifiedColType = withOrigin(ctx) {
if (ctx.colPosition != null) {
operationNotAllowed("ALTER TABLE table ADD COLUMN ... FIRST | AFTER otherCol", ctx)
}

QualifiedColType(
typedVisit[Seq[String]](ctx.name),
typedVisit[DataType](ctx.dataType),
Option(ctx.comment).map(string))
}

/**
* Parse a [[AlterTableAddColumnsStatement]] command.
*
* For example:
* {{{
* ALTER TABLE table1
* ADD COLUMNS (col_name data_type [COMMENT col_comment], ...);
* }}}
*/
override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = withOrigin(ctx) {
AlterTableAddColumnsStatement(
visitMultipartIdentifier(ctx.multipartIdentifier),
ctx.columns.qualifiedColTypeWithPosition.asScala.map(typedVisit[QualifiedColType])
)
}

/**
* Parse a [[AlterTableRenameColumnStatement]] command.
*
* For example:
* {{{
* ALTER TABLE table1 RENAME COLUMN a.b.c TO x
* }}}
*/
override def visitRenameTableColumn(
ctx: RenameTableColumnContext): LogicalPlan = withOrigin(ctx) {
AlterTableRenameColumnStatement(
visitMultipartIdentifier(ctx.multipartIdentifier),
ctx.from.identifier.asScala.map(_.getText),
ctx.to.getText)
}

/**
* Parse a [[AlterTableAlterColumnStatement]] command.
*
* For example:
* {{{
* ALTER TABLE table1 ALTER COLUMN a.b.c TYPE bigint
* ALTER TABLE table1 ALTER COLUMN a.b.c TYPE bigint COMMENT 'new comment'
* ALTER TABLE table1 ALTER COLUMN a.b.c COMMENT 'new comment'
* }}}
*/
override def visitAlterTableColumn(
ctx: AlterTableColumnContext): LogicalPlan = withOrigin(ctx) {
val verb = if (ctx.CHANGE != null) "CHANGE" else "ALTER"
if (ctx.colPosition != null) {
operationNotAllowed(s"ALTER TABLE table $verb COLUMN ... FIRST | AFTER otherCol", ctx)
}

if (ctx.dataType == null && ctx.comment == null) {
operationNotAllowed(s"ALTER TABLE table $verb COLUMN requires a TYPE or a COMMENT", ctx)
}

AlterTableAlterColumnStatement(
visitMultipartIdentifier(ctx.multipartIdentifier),
typedVisit[Seq[String]](ctx.qualifiedName),
Option(ctx.dataType).map(typedVisit[DataType]),
Option(ctx.comment).map(string))
}

/**
* Parse a [[AlterTableDropColumnsStatement]] command.
*
* For example:
* {{{
* ALTER TABLE table1 DROP COLUMN a.b.c
* ALTER TABLE table1 DROP COLUMNS a.b.c, x, y
* }}}
*/
override def visitDropTableColumns(
ctx: DropTableColumnsContext): LogicalPlan = withOrigin(ctx) {
val columnsToDrop = ctx.columns.qualifiedName.asScala.map(typedVisit[Seq[String]])
AlterTableDropColumnsStatement(
visitMultipartIdentifier(ctx.multipartIdentifier),
columnsToDrop)
}

/**
* Parse [[AlterViewSetPropertiesStatement]] or [[AlterTableSetPropertiesStatement]] commands.
*
* For example:
* {{{
* ALTER TABLE table SET TBLPROPERTIES ('comment' = new_comment);
* ALTER VIEW view SET TBLPROPERTIES ('comment' = new_comment);
* }}}
*/
override def visitSetTableProperties(
ctx: SetTablePropertiesContext): LogicalPlan = withOrigin(ctx) {
val identifier = visitMultipartIdentifier(ctx.multipartIdentifier)
val properties = visitPropertyKeyValues(ctx.tablePropertyList)
if (ctx.VIEW != null) {
AlterViewSetPropertiesStatement(identifier, properties)
} else {
AlterTableSetPropertiesStatement(identifier, properties)
}
}

/**
* Parse [[AlterViewUnsetPropertiesStatement]] or [[AlterTableUnsetPropertiesStatement]] commands.
*
* For example:
* {{{
* ALTER TABLE table UNSET TBLPROPERTIES [IF EXISTS] ('comment', 'key');
* ALTER VIEW view UNSET TBLPROPERTIES [IF EXISTS] ('comment', 'key');
* }}}
*/
override def visitUnsetTableProperties(
ctx: UnsetTablePropertiesContext): LogicalPlan = withOrigin(ctx) {
val identifier = visitMultipartIdentifier(ctx.multipartIdentifier)
val properties = visitPropertyKeys(ctx.tablePropertyList)
val ifExists = ctx.EXISTS != null
if (ctx.VIEW != null) {
AlterViewUnsetPropertiesStatement(identifier, properties, ifExists)
} else {
AlterTableUnsetPropertiesStatement(identifier, properties, ifExists)
}
}

/**
* Create an [[AlterTableSetLocationStatement]] command.
*
* For example:
* {{{
* ALTER TABLE table SET LOCATION "loc";
* }}}
*/
override def visitSetTableLocation(ctx: SetTableLocationContext): LogicalPlan = withOrigin(ctx) {
AlterTableSetLocationStatement(
visitMultipartIdentifier(ctx.multipartIdentifier),
visitLocationSpec(ctx.locationSpec))
}
}
Loading

0 comments on commit e1365ba

Please sign in to comment.