Skip to content

Commit

Permalink
[SPARK-27857][SQL] Move ALTER TABLE parsing into Catalyst
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This moves parsing logic for `ALTER TABLE` into Catalyst and adds parsed logical plans for alter table changes that use multi-part identifiers. This PR is similar to SPARK-27108, PR apache#24029, that created parsed logical plans for create and CTAS.

* Create parsed logical plans
* Move parsing logic into Catalyst's AstBuilder
* Convert to DataSource plans in DataSourceResolution
* Parse `ALTER TABLE ... SET LOCATION ...` separately from the partition variant
* Parse `ALTER TABLE ... ALTER COLUMN ... [TYPE dataType] [COMMENT comment]` [as discussed on the dev list](http://apache-spark-developers-list.1001551.n3.nabble.com/DISCUSS-Syntax-for-table-DDL-td25197.html#a25270)
* Parse `ALTER TABLE ... RENAME COLUMN ... TO ...`
* Parse `ALTER TABLE ... DROP COLUMNS ...`

## How was this patch tested?

* Added new tests in Catalyst's `DDLParserSuite`
* Moved converted plan tests from SQL `DDLParserSuite` to `PlanResolutionSuite`
* Existing tests for regressions

Closes apache#24723 from rdblue/SPARK-27857-add-alter-table-statements-in-catalyst.

Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
  • Loading branch information
rdblue authored and emanuelebardelli committed Jun 15, 2019
1 parent 2213da0 commit 339cd29
Show file tree
Hide file tree
Showing 11 changed files with 635 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,27 @@ statement
LIKE source=tableIdentifier locationSpec? #createTableLike
| ANALYZE TABLE tableIdentifier partitionSpec? COMPUTE STATISTICS
(identifier | FOR COLUMNS identifierSeq | FOR ALL COLUMNS)? #analyze
| ALTER TABLE tableIdentifier
ADD COLUMNS '(' columns=colTypeList ')' #addTableColumns
| ALTER TABLE multipartIdentifier
ADD (COLUMN | COLUMNS)
columns=qualifiedColTypeWithPositionList #addTableColumns
| ALTER TABLE multipartIdentifier
ADD (COLUMN | COLUMNS)
'(' columns=qualifiedColTypeWithPositionList ')' #addTableColumns
| ALTER TABLE multipartIdentifier
RENAME COLUMN from=qualifiedName TO to=identifier #renameTableColumn
| ALTER TABLE multipartIdentifier
DROP (COLUMN | COLUMNS) '(' columns=qualifiedNameList ')' #dropTableColumns
| ALTER TABLE multipartIdentifier
DROP (COLUMN | COLUMNS) columns=qualifiedNameList #dropTableColumns
| ALTER (TABLE | VIEW) from=tableIdentifier
RENAME TO to=tableIdentifier #renameTable
| ALTER (TABLE | VIEW) tableIdentifier
| ALTER (TABLE | VIEW) multipartIdentifier
SET TBLPROPERTIES tablePropertyList #setTableProperties
| ALTER (TABLE | VIEW) tableIdentifier
| ALTER (TABLE | VIEW) multipartIdentifier
UNSET TBLPROPERTIES (IF EXISTS)? tablePropertyList #unsetTableProperties
| ALTER TABLE multipartIdentifier
(ALTER | CHANGE) COLUMN? qualifiedName
(TYPE dataType)? (COMMENT comment=STRING)? colPosition? #alterTableColumn
| ALTER TABLE tableIdentifier partitionSpec?
CHANGE COLUMN? identifier colType colPosition? #changeColumn
| ALTER TABLE tableIdentifier (partitionSpec)?
Expand All @@ -137,7 +150,8 @@ statement
DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* PURGE? #dropTablePartitions
| ALTER VIEW tableIdentifier
DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* #dropTablePartitions
| ALTER TABLE tableIdentifier partitionSpec? SET locationSpec #setTableLocation
| ALTER TABLE multipartIdentifier SET locationSpec #setTableLocation
| ALTER TABLE tableIdentifier partitionSpec SET locationSpec #setPartitionLocation
| ALTER TABLE tableIdentifier RECOVER PARTITIONS #recoverPartitions
| DROP TABLE (IF EXISTS)? multipartIdentifier PURGE? #dropTable
| DROP VIEW (IF EXISTS)? multipartIdentifier #dropView
Expand Down Expand Up @@ -720,7 +734,7 @@ intervalUnit
;

colPosition
: FIRST | AFTER identifier
: FIRST | AFTER qualifiedName
;

dataType
Expand All @@ -730,6 +744,14 @@ dataType
| identifier ('(' INTEGER_VALUE (',' INTEGER_VALUE)* ')')? #primitiveDataType
;

qualifiedColTypeWithPositionList
: qualifiedColTypeWithPosition (',' qualifiedColTypeWithPosition)*
;

qualifiedColTypeWithPosition
: name=qualifiedName dataType (COMMENT comment=STRING)? colPosition?
;

colTypeList
: colType (',' colType)*
;
Expand Down Expand Up @@ -782,6 +804,10 @@ frameBound
| expression boundType=(PRECEDING | FOLLOWING)
;

qualifiedNameList
: qualifiedName (',' qualifiedName)*
;

qualifiedName
: identifier ('.' identifier)*
;
Expand Down Expand Up @@ -1244,6 +1270,7 @@ nonReserved
| TRANSFORM
| TRUE
| TRUNCATE
| TYPE
| UNARCHIVE
| UNBOUNDED
| UNCACHE
Expand Down Expand Up @@ -1499,6 +1526,7 @@ TRANSACTIONS: 'TRANSACTIONS';
TRANSFORM: 'TRANSFORM';
TRUE: 'TRUE';
TRUNCATE: 'TRUNCATE';
TYPE: 'TYPE';
UNARCHIVE: 'UNARCHIVE';
UNBOUNDED: 'UNBOUNDED';
UNCACHE: 'UNCACHE';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, QualifiedColType}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -2035,6 +2035,13 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
(multipartIdentifier, temporary, ifNotExists, ctx.EXTERNAL != null)
}

/**
* Parse a qualified name to a multipart name.
*/
override def visitQualifiedName(ctx: QualifiedNameContext): Seq[String] = withOrigin(ctx) {
ctx.identifier.asScala.map(_.getText)
}

/**
* Parse a list of transforms.
*/
Expand Down Expand Up @@ -2067,8 +2074,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging

ctx.transforms.asScala.map {
case identityCtx: IdentityTransformContext =>
IdentityTransform(FieldReference(
identityCtx.qualifiedName.identifier.asScala.map(_.getText)))
IdentityTransform(FieldReference(typedVisit[Seq[String]](identityCtx.qualifiedName)))

case applyCtx: ApplyTransformContext =>
val arguments = applyCtx.argument.asScala.map(visitTransformArgument)
Expand Down Expand Up @@ -2115,7 +2121,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
override def visitTransformArgument(ctx: TransformArgumentContext): v2.expressions.Expression = {
withOrigin(ctx) {
val reference = Option(ctx.qualifiedName)
.map(nameCtx => FieldReference(nameCtx.identifier.asScala.map(_.getText)))
.map(typedVisit[Seq[String]])
.map(FieldReference(_))
val literal = Option(ctx.constant)
.map(typedVisit[Literal])
.map(lit => LiteralValue(lit.value, lit.dataType))
Expand Down Expand Up @@ -2213,4 +2220,151 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
visitMultipartIdentifier(ctx.multipartIdentifier()),
ctx.EXISTS != null)
}

/**
* Parse new column info from ADD COLUMN into a QualifiedColType.
*/
override def visitQualifiedColTypeWithPosition(
ctx: QualifiedColTypeWithPositionContext): QualifiedColType = withOrigin(ctx) {
if (ctx.colPosition != null) {
operationNotAllowed("ALTER TABLE table ADD COLUMN ... FIRST | AFTER otherCol", ctx)
}

QualifiedColType(
typedVisit[Seq[String]](ctx.name),
typedVisit[DataType](ctx.dataType),
Option(ctx.comment).map(string))
}

/**
* Parse a [[AlterTableAddColumnsStatement]] command.
*
* For example:
* {{{
* ALTER TABLE table1
* ADD COLUMNS (col_name data_type [COMMENT col_comment], ...);
* }}}
*/
override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = withOrigin(ctx) {
AlterTableAddColumnsStatement(
visitMultipartIdentifier(ctx.multipartIdentifier),
ctx.columns.qualifiedColTypeWithPosition.asScala.map(typedVisit[QualifiedColType])
)
}

/**
* Parse a [[AlterTableRenameColumnStatement]] command.
*
* For example:
* {{{
* ALTER TABLE table1 RENAME COLUMN a.b.c TO x
* }}}
*/
override def visitRenameTableColumn(
ctx: RenameTableColumnContext): LogicalPlan = withOrigin(ctx) {
AlterTableRenameColumnStatement(
visitMultipartIdentifier(ctx.multipartIdentifier),
ctx.from.identifier.asScala.map(_.getText),
ctx.to.getText)
}

/**
* Parse a [[AlterTableAlterColumnStatement]] command.
*
* For example:
* {{{
* ALTER TABLE table1 ALTER COLUMN a.b.c TYPE bigint
* ALTER TABLE table1 ALTER COLUMN a.b.c TYPE bigint COMMENT 'new comment'
* ALTER TABLE table1 ALTER COLUMN a.b.c COMMENT 'new comment'
* }}}
*/
override def visitAlterTableColumn(
ctx: AlterTableColumnContext): LogicalPlan = withOrigin(ctx) {
val verb = if (ctx.CHANGE != null) "CHANGE" else "ALTER"
if (ctx.colPosition != null) {
operationNotAllowed(s"ALTER TABLE table $verb COLUMN ... FIRST | AFTER otherCol", ctx)
}

if (ctx.dataType == null && ctx.comment == null) {
operationNotAllowed(s"ALTER TABLE table $verb COLUMN requires a TYPE or a COMMENT", ctx)
}

AlterTableAlterColumnStatement(
visitMultipartIdentifier(ctx.multipartIdentifier),
typedVisit[Seq[String]](ctx.qualifiedName),
Option(ctx.dataType).map(typedVisit[DataType]),
Option(ctx.comment).map(string))
}

/**
* Parse a [[AlterTableDropColumnsStatement]] command.
*
* For example:
* {{{
* ALTER TABLE table1 DROP COLUMN a.b.c
* ALTER TABLE table1 DROP COLUMNS a.b.c, x, y
* }}}
*/
override def visitDropTableColumns(
ctx: DropTableColumnsContext): LogicalPlan = withOrigin(ctx) {
val columnsToDrop = ctx.columns.qualifiedName.asScala.map(typedVisit[Seq[String]])
AlterTableDropColumnsStatement(
visitMultipartIdentifier(ctx.multipartIdentifier),
columnsToDrop)
}

/**
* Parse [[AlterViewSetPropertiesStatement]] or [[AlterTableSetPropertiesStatement]] commands.
*
* For example:
* {{{
* ALTER TABLE table SET TBLPROPERTIES ('comment' = new_comment);
* ALTER VIEW view SET TBLPROPERTIES ('comment' = new_comment);
* }}}
*/
override def visitSetTableProperties(
ctx: SetTablePropertiesContext): LogicalPlan = withOrigin(ctx) {
val identifier = visitMultipartIdentifier(ctx.multipartIdentifier)
val properties = visitPropertyKeyValues(ctx.tablePropertyList)
if (ctx.VIEW != null) {
AlterViewSetPropertiesStatement(identifier, properties)
} else {
AlterTableSetPropertiesStatement(identifier, properties)
}
}

/**
* Parse [[AlterViewUnsetPropertiesStatement]] or [[AlterTableUnsetPropertiesStatement]] commands.
*
* For example:
* {{{
* ALTER TABLE table UNSET TBLPROPERTIES [IF EXISTS] ('comment', 'key');
* ALTER VIEW view UNSET TBLPROPERTIES [IF EXISTS] ('comment', 'key');
* }}}
*/
override def visitUnsetTableProperties(
ctx: UnsetTablePropertiesContext): LogicalPlan = withOrigin(ctx) {
val identifier = visitMultipartIdentifier(ctx.multipartIdentifier)
val properties = visitPropertyKeys(ctx.tablePropertyList)
val ifExists = ctx.EXISTS != null
if (ctx.VIEW != null) {
AlterViewUnsetPropertiesStatement(identifier, properties, ifExists)
} else {
AlterTableUnsetPropertiesStatement(identifier, properties, ifExists)
}
}

/**
* Create an [[AlterTableSetLocationStatement]] command.
*
* For example:
* {{{
* ALTER TABLE table SET LOCATION "loc";
* }}}
*/
override def visitSetTableLocation(ctx: SetTableLocationContext): LogicalPlan = withOrigin(ctx) {
AlterTableSetLocationStatement(
visitMultipartIdentifier(ctx.multipartIdentifier),
visitLocationSpec(ctx.locationSpec))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical.sql

import org.apache.spark.sql.types.DataType

/**
* Column data as parsed by ALTER TABLE ... ADD COLUMNS.
*/
case class QualifiedColType(name: Seq[String], dataType: DataType, comment: Option[String])

/**
* ALTER TABLE ... ADD COLUMNS command, as parsed from SQL.
*/
case class AlterTableAddColumnsStatement(
tableName: Seq[String],
columnsToAdd: Seq[QualifiedColType]) extends ParsedStatement

/**
* ALTER TABLE ... CHANGE COLUMN command, as parsed from SQL.
*/
case class AlterTableAlterColumnStatement(
tableName: Seq[String],
column: Seq[String],
dataType: Option[DataType],
comment: Option[String]) extends ParsedStatement

/**
* ALTER TABLE ... RENAME COLUMN command, as parsed from SQL.
*/
case class AlterTableRenameColumnStatement(
tableName: Seq[String],
column: Seq[String],
newName: String) extends ParsedStatement

/**
* ALTER TABLE ... DROP COLUMNS command, as parsed from SQL.
*/
case class AlterTableDropColumnsStatement(
tableName: Seq[String],
columnsToDrop: Seq[Seq[String]]) extends ParsedStatement

/**
* ALTER TABLE ... SET TBLPROPERTIES command, as parsed from SQL.
*/
case class AlterTableSetPropertiesStatement(
tableName: Seq[String],
properties: Map[String, String]) extends ParsedStatement

/**
* ALTER TABLE ... UNSET TBLPROPERTIES command, as parsed from SQL.
*/
case class AlterTableUnsetPropertiesStatement(
tableName: Seq[String],
propertyKeys: Seq[String],
ifExists: Boolean) extends ParsedStatement

/**
* ALTER TABLE ... SET LOCATION command, as parsed from SQL.
*/
case class AlterTableSetLocationStatement(
tableName: Seq[String],
location: String) extends ParsedStatement
Loading

0 comments on commit 339cd29

Please sign in to comment.