Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27108][SQL] Add parsed SQL plans for create, CTAS. #24029

Closed
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -1888,4 +1889,193 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
val structField = StructField(identifier.getText, typedVisit(dataType), nullable = true)
if (STRING == null) structField else structField.withComment(string(STRING))
}

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't review this file carefully, assuming it's just copy-paste code from SparkSqlAstBuilder

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is moving what is needed from SparkSqlAstBuilder. The only real changes are in the visitCreateTable method.

These rules should have already been in the abstract class. I'm not sure why they were in SparkSqlAstBuilder other than that was the easiest place to put them when they were added.

* Create location string.
*/
override def visitLocationSpec(ctx: LocationSpecContext): String = withOrigin(ctx) {
string(ctx.STRING)
}

/**
* Create a [[BucketSpec]].
*/
override def visitBucketSpec(ctx: BucketSpecContext): BucketSpec = withOrigin(ctx) {
BucketSpec(
ctx.INTEGER_VALUE.getText.toInt,
visitIdentifierList(ctx.identifierList),
Option(ctx.orderedIdentifierList)
.toSeq
.flatMap(_.orderedIdentifier.asScala)
.map { orderedIdCtx =>
Option(orderedIdCtx.ordering).map(_.getText).foreach { dir =>
if (dir.toLowerCase(Locale.ROOT) != "asc") {
operationNotAllowed(s"Column ordering must be ASC, was '$dir'", ctx)
}
}

orderedIdCtx.identifier.getText
})
}

/**
* Convert a table property list into a key-value map.
* This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]].
*/
override def visitTablePropertyList(
ctx: TablePropertyListContext): Map[String, String] = withOrigin(ctx) {
val properties = ctx.tableProperty.asScala.map { property =>
val key = visitTablePropertyKey(property.key)
val value = visitTablePropertyValue(property.value)
key -> value
}
// Check for duplicate property names.
checkDuplicateKeys(properties, ctx)
properties.toMap
}

/**
* Parse a key-value map from a [[TablePropertyListContext]], assuming all values are specified.
*/
def visitPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String] = {
val props = visitTablePropertyList(ctx)
val badKeys = props.collect { case (key, null) => key }
if (badKeys.nonEmpty) {
operationNotAllowed(
s"Values must be specified for key(s): ${badKeys.mkString("[", ",", "]")}", ctx)
}
props
}

/**
* Parse a list of keys from a [[TablePropertyListContext]], assuming no values are specified.
*/
def visitPropertyKeys(ctx: TablePropertyListContext): Seq[String] = {
val props = visitTablePropertyList(ctx)
val badKeys = props.filter { case (_, v) => v != null }.keys
if (badKeys.nonEmpty) {
operationNotAllowed(
s"Values should not be specified for key(s): ${badKeys.mkString("[", ",", "]")}", ctx)
}
props.keys.toSeq
}

/**
* A table property key can either be String or a collection of dot separated elements. This
* function extracts the property key based on whether its a string literal or a table property
* identifier.
*/
override def visitTablePropertyKey(key: TablePropertyKeyContext): String = {
if (key.STRING != null) {
string(key.STRING)
} else {
key.getText
}
}

/**
* A table property value can be String, Integer, Boolean or Decimal. This function extracts
* the property value based on whether its a string, integer, boolean or decimal literal.
*/
override def visitTablePropertyValue(value: TablePropertyValueContext): String = {
if (value == null) {
null
} else if (value.STRING != null) {
string(value.STRING)
} else if (value.booleanValue != null) {
value.getText.toLowerCase(Locale.ROOT)
} else {
value.getText
}
}

/**
* Type to keep track of a table header: (identifier, isTemporary, ifNotExists, isExternal).
*/
type TableHeader = (TableIdentifier, Boolean, Boolean, Boolean)

/**
* Validate a create table statement and return the [[TableIdentifier]].
*/
override def visitCreateTableHeader(
ctx: CreateTableHeaderContext): TableHeader = withOrigin(ctx) {
val temporary = ctx.TEMPORARY != null
val ifNotExists = ctx.EXISTS != null
if (temporary && ifNotExists) {
operationNotAllowed("CREATE TEMPORARY TABLE ... IF NOT EXISTS", ctx)
}
(visitTableIdentifier(ctx.tableIdentifier), temporary, ifNotExists, ctx.EXTERNAL != null)
}

/**
* Create a table, returning a [[CreateTableStatement]] logical plan.
*
* Expected format:
* {{{
* CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name
* USING table_provider
* create_table_clauses
* [[AS] select_statement];
*
* create_table_clauses (order insensitive):
* [OPTIONS table_property_list]
* [PARTITIONED BY (col_name, col_name, ...)]
* [CLUSTERED BY (col_name, col_name, ...)
* [SORTED BY (col_name [ASC|DESC], ...)]
* INTO num_buckets BUCKETS
* ]
* [LOCATION path]
* [COMMENT table_comment]
* [TBLPROPERTIES (property_name=property_value, ...)]
* }}}
*/
override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
if (external) {
operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx)
}

checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx)
checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)

val schema = Option(ctx.colTypeList()).map(createSchema)
val partitionCols: Seq[String] =
Option(ctx.partitionColumnNames).map(visitIdentifierList).getOrElse(Nil)
val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec)
val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)

val provider = ctx.tableProvider.qualifiedName.getText
val location = ctx.locationSpec.asScala.headOption.map(visitLocationSpec)
val comment = Option(ctx.comment).map(string)

Option(ctx.query).map(plan) match {
case Some(_) if temp =>
operationNotAllowed("CREATE TEMPORARY TABLE ... USING ... AS query", ctx)

case Some(_) if schema.isDefined =>
operationNotAllowed(
"Schema may not be specified in a Create Table As Select (CTAS) statement",
ctx)

case Some(query) =>
CreateTableAsSelectStatement(
table, query, partitionCols, bucketSpec, properties, provider, options, location, comment,
ifNotExists = ifNotExists)

case None if temp =>
// CREATE TEMPORARY TABLE ... USING ... is not supported by the catalyst parser.
// Use CREATE TEMPORARY VIEW ... USING ... instead.
operationNotAllowed("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx)

case _ =>
CreateTableStatement(table, schema.getOrElse(new StructType), partitionCols, bucketSpec,
properties, provider, options, location, comment, ifNotExists = ifNotExists)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical.sql

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types.StructType

/**
* A CREATE TABLE command, as parsed from SQL.
*
* This is a metadata-only command and is not used to write data to the created table.
*/
case class CreateTableStatement(
table: TableIdentifier,
tableSchema: StructType,
partitioning: Seq[String],
bucketSpec: Option[BucketSpec],
properties: Map[String, String],
provider: String,
options: Map[String, String],
location: Option[String],
comment: Option[String],
ifNotExists: Boolean) extends ParsedStatement {

override def output: Seq[Attribute] = Seq.empty

override def children: Seq[LogicalPlan] = Seq.empty
}

/**
* A CREATE TABLE AS SELECT command, as parsed from SQL.
*/
case class CreateTableAsSelectStatement(
table: TableIdentifier,
asSelect: LogicalPlan,
partitioning: Seq[String],
bucketSpec: Option[BucketSpec],
properties: Map[String, String],
provider: String,
options: Map[String, String],
location: Option[String],
comment: Option[String],
ifNotExists: Boolean) extends ParsedStatement {

override def output: Seq[Attribute] = Seq.empty

override def children: Seq[LogicalPlan] = Seq(asSelect)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical.sql

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

/**
* A logical plan node that contains exactly what was parsed from SQL.
*
* This is used to hold information parsed from SQL when there are multiple implementations of a
* query or command. For example, CREATE TABLE may be implemented by different nodes for v1 and v2.
* Instead of parsing directly to a v1 CreateTable that keeps metadata in CatalogTable, and then
* converting that v1 metadata to the v2 equivalent, the sql [[CreateTableStatement]] plan is
* produced by the parser and converted once into both implementations.
*
* Parsed logical plans are not resolved because they must be converted to concrete logical plans.
*
* Parsed logical plans are located in Catalyst so that as much SQL parsing logic as possible is be
* kept in a [[org.apache.spark.sql.catalyst.parser.AbstractSqlParser]].
*/
private[sql] abstract class ParsedStatement extends LogicalPlan {
// Redact properties and options when parsed nodes are used by generic methods like toString
override def productIterator: Iterator[Any] = super.productIterator.map {
case mapArg: Map[_, _] => conf.redactOptions(mapArg.asInstanceOf[Map[String, String]])
case other => other
}

final override lazy val resolved = false
}
Loading