diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 38a61b8731778..52a5d2c66c5b7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -30,12 +30,13 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last} import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement} import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -1888,4 +1889,193 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging val structField = StructField(identifier.getText, typedVisit(dataType), nullable = true) if (STRING == null) structField else structField.withComment(string(STRING)) } + + /** + * Create location string. + */ + override def visitLocationSpec(ctx: LocationSpecContext): String = withOrigin(ctx) { + string(ctx.STRING) + } + + /** + * Create a [[BucketSpec]]. + */ + override def visitBucketSpec(ctx: BucketSpecContext): BucketSpec = withOrigin(ctx) { + BucketSpec( + ctx.INTEGER_VALUE.getText.toInt, + visitIdentifierList(ctx.identifierList), + Option(ctx.orderedIdentifierList) + .toSeq + .flatMap(_.orderedIdentifier.asScala) + .map { orderedIdCtx => + Option(orderedIdCtx.ordering).map(_.getText).foreach { dir => + if (dir.toLowerCase(Locale.ROOT) != "asc") { + operationNotAllowed(s"Column ordering must be ASC, was '$dir'", ctx) + } + } + + orderedIdCtx.identifier.getText + }) + } + + /** + * Convert a table property list into a key-value map. + * This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]]. + */ + override def visitTablePropertyList( + ctx: TablePropertyListContext): Map[String, String] = withOrigin(ctx) { + val properties = ctx.tableProperty.asScala.map { property => + val key = visitTablePropertyKey(property.key) + val value = visitTablePropertyValue(property.value) + key -> value + } + // Check for duplicate property names. + checkDuplicateKeys(properties, ctx) + properties.toMap + } + + /** + * Parse a key-value map from a [[TablePropertyListContext]], assuming all values are specified. + */ + def visitPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String] = { + val props = visitTablePropertyList(ctx) + val badKeys = props.collect { case (key, null) => key } + if (badKeys.nonEmpty) { + operationNotAllowed( + s"Values must be specified for key(s): ${badKeys.mkString("[", ",", "]")}", ctx) + } + props + } + + /** + * Parse a list of keys from a [[TablePropertyListContext]], assuming no values are specified. + */ + def visitPropertyKeys(ctx: TablePropertyListContext): Seq[String] = { + val props = visitTablePropertyList(ctx) + val badKeys = props.filter { case (_, v) => v != null }.keys + if (badKeys.nonEmpty) { + operationNotAllowed( + s"Values should not be specified for key(s): ${badKeys.mkString("[", ",", "]")}", ctx) + } + props.keys.toSeq + } + + /** + * A table property key can either be String or a collection of dot separated elements. This + * function extracts the property key based on whether its a string literal or a table property + * identifier. + */ + override def visitTablePropertyKey(key: TablePropertyKeyContext): String = { + if (key.STRING != null) { + string(key.STRING) + } else { + key.getText + } + } + + /** + * A table property value can be String, Integer, Boolean or Decimal. This function extracts + * the property value based on whether its a string, integer, boolean or decimal literal. + */ + override def visitTablePropertyValue(value: TablePropertyValueContext): String = { + if (value == null) { + null + } else if (value.STRING != null) { + string(value.STRING) + } else if (value.booleanValue != null) { + value.getText.toLowerCase(Locale.ROOT) + } else { + value.getText + } + } + + /** + * Type to keep track of a table header: (identifier, isTemporary, ifNotExists, isExternal). + */ + type TableHeader = (TableIdentifier, Boolean, Boolean, Boolean) + + /** + * Validate a create table statement and return the [[TableIdentifier]]. + */ + override def visitCreateTableHeader( + ctx: CreateTableHeaderContext): TableHeader = withOrigin(ctx) { + val temporary = ctx.TEMPORARY != null + val ifNotExists = ctx.EXISTS != null + if (temporary && ifNotExists) { + operationNotAllowed("CREATE TEMPORARY TABLE ... IF NOT EXISTS", ctx) + } + (visitTableIdentifier(ctx.tableIdentifier), temporary, ifNotExists, ctx.EXTERNAL != null) + } + + /** + * Create a table, returning a [[CreateTableStatement]] logical plan. + * + * Expected format: + * {{{ + * CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name + * USING table_provider + * create_table_clauses + * [[AS] select_statement]; + * + * create_table_clauses (order insensitive): + * [OPTIONS table_property_list] + * [PARTITIONED BY (col_name, col_name, ...)] + * [CLUSTERED BY (col_name, col_name, ...) + * [SORTED BY (col_name [ASC|DESC], ...)] + * INTO num_buckets BUCKETS + * ] + * [LOCATION path] + * [COMMENT table_comment] + * [TBLPROPERTIES (property_name=property_value, ...)] + * }}} + */ + override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) { + val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader) + if (external) { + operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx) + } + + checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx) + checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx) + checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx) + checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx) + checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx) + checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx) + + val schema = Option(ctx.colTypeList()).map(createSchema) + val partitionCols: Seq[String] = + Option(ctx.partitionColumnNames).map(visitIdentifierList).getOrElse(Nil) + val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec) + val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty) + val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) + + val provider = ctx.tableProvider.qualifiedName.getText + val location = ctx.locationSpec.asScala.headOption.map(visitLocationSpec) + val comment = Option(ctx.comment).map(string) + + Option(ctx.query).map(plan) match { + case Some(_) if temp => + operationNotAllowed("CREATE TEMPORARY TABLE ... USING ... AS query", ctx) + + case Some(_) if schema.isDefined => + operationNotAllowed( + "Schema may not be specified in a Create Table As Select (CTAS) statement", + ctx) + + case Some(query) => + CreateTableAsSelectStatement( + table, query, partitionCols, bucketSpec, properties, provider, options, location, comment, + ifNotExists = ifNotExists) + + case None if temp => + // CREATE TEMPORARY TABLE ... USING ... is not supported by the catalyst parser. + // Use CREATE TEMPORARY VIEW ... USING ... instead. + operationNotAllowed("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx) + + case _ => + CreateTableStatement(table, schema.getOrElse(new StructType), partitionCols, bucketSpec, + properties, provider, options, location, comment, ifNotExists = ifNotExists) + } + } + } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTableStatement.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTableStatement.scala new file mode 100644 index 0000000000000..c734968e838db --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTableStatement.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.sql + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.types.StructType + +/** + * A CREATE TABLE command, as parsed from SQL. + * + * This is a metadata-only command and is not used to write data to the created table. + */ +case class CreateTableStatement( + table: TableIdentifier, + tableSchema: StructType, + partitioning: Seq[String], + bucketSpec: Option[BucketSpec], + properties: Map[String, String], + provider: String, + options: Map[String, String], + location: Option[String], + comment: Option[String], + ifNotExists: Boolean) extends ParsedStatement { + + override def output: Seq[Attribute] = Seq.empty + + override def children: Seq[LogicalPlan] = Seq.empty +} + +/** + * A CREATE TABLE AS SELECT command, as parsed from SQL. + */ +case class CreateTableAsSelectStatement( + table: TableIdentifier, + asSelect: LogicalPlan, + partitioning: Seq[String], + bucketSpec: Option[BucketSpec], + properties: Map[String, String], + provider: String, + options: Map[String, String], + location: Option[String], + comment: Option[String], + ifNotExists: Boolean) extends ParsedStatement { + + override def output: Seq[Attribute] = Seq.empty + + override def children: Seq[LogicalPlan] = Seq(asSelect) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedStatement.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedStatement.scala new file mode 100644 index 0000000000000..510f2a1ba1e6d --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedStatement.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.sql + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +/** + * A logical plan node that contains exactly what was parsed from SQL. + * + * This is used to hold information parsed from SQL when there are multiple implementations of a + * query or command. For example, CREATE TABLE may be implemented by different nodes for v1 and v2. + * Instead of parsing directly to a v1 CreateTable that keeps metadata in CatalogTable, and then + * converting that v1 metadata to the v2 equivalent, the sql [[CreateTableStatement]] plan is + * produced by the parser and converted once into both implementations. + * + * Parsed logical plans are not resolved because they must be converted to concrete logical plans. + * + * Parsed logical plans are located in Catalyst so that as much SQL parsing logic as possible is be + * kept in a [[org.apache.spark.sql.catalyst.parser.AbstractSqlParser]]. + */ +private[sql] abstract class ParsedStatement extends LogicalPlan { + // Redact properties and options when parsed nodes are used by generic methods like toString + override def productIterator: Iterator[Any] = super.productIterator.map { + case mapArg: Map[_, _] => conf.redactOptions(mapArg.asInstanceOf[Map[String, String]]) + case other => other + } + + final override lazy val resolved = false +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala new file mode 100644 index 0000000000000..dae8f582c7716 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.parser + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.AnalysisTest +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement} +import org.apache.spark.sql.types.{IntegerType, StringType, StructType} + +class DDLParserSuite extends AnalysisTest { + import CatalystSqlParser._ + + private def intercept(sqlCommand: String, messages: String*): Unit = { + val e = intercept[ParseException](parsePlan(sqlCommand)) + messages.foreach { message => + assert(e.message.contains(message)) + } + } + + test("create table using - schema") { + val sql = "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet" + + parsePlan(sql) match { + case create: CreateTableStatement => + assert(create.table == TableIdentifier("my_tab")) + assert(create.tableSchema == new StructType() + .add("a", IntegerType, nullable = true, "test") + .add("b", StringType)) + assert(create.partitioning.isEmpty) + assert(create.bucketSpec.isEmpty) + assert(create.properties.isEmpty) + assert(create.provider == "parquet") + assert(create.options.isEmpty) + assert(create.location.isEmpty) + assert(create.comment.isEmpty) + assert(!create.ifNotExists) + + case other => + fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," + + s"got ${other.getClass.getName}: $sql") + } + + intercept("CREATE TABLE my_tab(a: INT COMMENT 'test', b: STRING) USING parquet", + "no viable alternative at input") + } + + test("create table - with IF NOT EXISTS") { + val sql = "CREATE TABLE IF NOT EXISTS my_tab(a INT, b STRING) USING parquet" + + parsePlan(sql) match { + case create: CreateTableStatement => + assert(create.table == TableIdentifier("my_tab")) + assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType)) + assert(create.partitioning.isEmpty) + assert(create.bucketSpec.isEmpty) + assert(create.properties.isEmpty) + assert(create.provider == "parquet") + assert(create.options.isEmpty) + assert(create.location.isEmpty) + assert(create.comment.isEmpty) + assert(create.ifNotExists) + + case other => + fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," + + s"got ${other.getClass.getName}: $sql") + } + } + + test("create table - with partitioned by") { + val query = "CREATE TABLE my_tab(a INT comment 'test', b STRING) " + + "USING parquet PARTITIONED BY (a)" + + parsePlan(query) match { + case create: CreateTableStatement => + assert(create.table == TableIdentifier("my_tab")) + assert(create.tableSchema == new StructType() + .add("a", IntegerType, nullable = true, "test") + .add("b", StringType)) + assert(create.partitioning == Seq("a")) + assert(create.bucketSpec.isEmpty) + assert(create.properties.isEmpty) + assert(create.provider == "parquet") + assert(create.options.isEmpty) + assert(create.location.isEmpty) + assert(create.comment.isEmpty) + assert(!create.ifNotExists) + + case other => + fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," + + s"got ${other.getClass.getName}: $query") + } + } + + test("create table - with bucket") { + val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " + + "CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS" + + parsePlan(query) match { + case create: CreateTableStatement => + assert(create.table == TableIdentifier("my_tab")) + assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType)) + assert(create.partitioning.isEmpty) + assert(create.bucketSpec.contains(BucketSpec(5, Seq("a"), Seq("b")))) + assert(create.properties.isEmpty) + assert(create.provider == "parquet") + assert(create.options.isEmpty) + assert(create.location.isEmpty) + assert(create.comment.isEmpty) + assert(!create.ifNotExists) + + case other => + fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," + + s"got ${other.getClass.getName}: $query") + } + } + + test("create table - with comment") { + val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 'abc'" + + parsePlan(sql) match { + case create: CreateTableStatement => + assert(create.table == TableIdentifier("my_tab")) + assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType)) + assert(create.partitioning.isEmpty) + assert(create.bucketSpec.isEmpty) + assert(create.properties.isEmpty) + assert(create.provider == "parquet") + assert(create.options.isEmpty) + assert(create.location.isEmpty) + assert(create.comment.contains("abc")) + assert(!create.ifNotExists) + + case other => + fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," + + s"got ${other.getClass.getName}: $sql") + } + } + + test("create table - with table properties") { + val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet TBLPROPERTIES('test' = 'test')" + + parsePlan(sql) match { + case create: CreateTableStatement => + assert(create.table == TableIdentifier("my_tab")) + assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType)) + assert(create.partitioning.isEmpty) + assert(create.bucketSpec.isEmpty) + assert(create.properties == Map("test" -> "test")) + assert(create.provider == "parquet") + assert(create.options.isEmpty) + assert(create.location.isEmpty) + assert(create.comment.isEmpty) + assert(!create.ifNotExists) + + case other => + fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," + + s"got ${other.getClass.getName}: $sql") + } + } + + test("create table - with location") { + val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet LOCATION '/tmp/file'" + + parsePlan(sql) match { + case create: CreateTableStatement => + assert(create.table == TableIdentifier("my_tab")) + assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType)) + assert(create.partitioning.isEmpty) + assert(create.bucketSpec.isEmpty) + assert(create.properties.isEmpty) + assert(create.provider == "parquet") + assert(create.options.isEmpty) + assert(create.location.contains("/tmp/file")) + assert(create.comment.isEmpty) + assert(!create.ifNotExists) + + case other => + fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," + + s"got ${other.getClass.getName}: $sql") + } + } + + test("create table - byte length literal table name") { + val sql = "CREATE TABLE 1m.2g(a INT) USING parquet" + + parsePlan(sql) match { + case create: CreateTableStatement => + assert(create.table == TableIdentifier("2g", Some("1m"))) + assert(create.tableSchema == new StructType().add("a", IntegerType)) + assert(create.partitioning.isEmpty) + assert(create.bucketSpec.isEmpty) + assert(create.properties.isEmpty) + assert(create.provider == "parquet") + assert(create.options.isEmpty) + assert(create.location.isEmpty) + assert(create.comment.isEmpty) + assert(!create.ifNotExists) + + case other => + fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," + + s"got ${other.getClass.getName}: $sql") + } + } + + test("Duplicate clauses - create table") { + def createTableHeader(duplicateClause: String): String = { + s"CREATE TABLE my_tab(a INT, b STRING) USING parquet $duplicateClause $duplicateClause" + } + + intercept(createTableHeader("TBLPROPERTIES('test' = 'test2')"), + "Found duplicate clauses: TBLPROPERTIES") + intercept(createTableHeader("LOCATION '/tmp/file'"), + "Found duplicate clauses: LOCATION") + intercept(createTableHeader("COMMENT 'a table'"), + "Found duplicate clauses: COMMENT") + intercept(createTableHeader("CLUSTERED BY(b) INTO 256 BUCKETS"), + "Found duplicate clauses: CLUSTERED BY") + intercept(createTableHeader("PARTITIONED BY (b)"), + "Found duplicate clauses: PARTITIONED BY") + } + + test("support for other types in OPTIONS") { + val sql = + """ + |CREATE TABLE table_name USING json + |OPTIONS (a 1, b 0.1, c TRUE) + """.stripMargin + + parsePlan(sql) match { + case create: CreateTableStatement => + assert(create.table == TableIdentifier("table_name")) + assert(create.tableSchema == new StructType) + assert(create.partitioning.isEmpty) + assert(create.bucketSpec.isEmpty) + assert(create.properties.isEmpty) + assert(create.provider == "json") + assert(create.options == Map("a" -> "1", "b" -> "0.1", "c" -> "true")) + assert(create.location.isEmpty) + assert(create.comment.isEmpty) + assert(!create.ifNotExists) + + case other => + fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," + + s"got ${other.getClass.getName}: $sql") + } + } + + test("Test CTAS against native tables") { + val s1 = + """ + |CREATE TABLE IF NOT EXISTS mydb.page_view + |USING parquet + |COMMENT 'This is the staging page view table' + |LOCATION '/user/external/page_view' + |TBLPROPERTIES ('p1'='v1', 'p2'='v2') + |AS SELECT * FROM src + """.stripMargin + + val s2 = + """ + |CREATE TABLE IF NOT EXISTS mydb.page_view + |USING parquet + |LOCATION '/user/external/page_view' + |COMMENT 'This is the staging page view table' + |TBLPROPERTIES ('p1'='v1', 'p2'='v2') + |AS SELECT * FROM src + """.stripMargin + + val s3 = + """ + |CREATE TABLE IF NOT EXISTS mydb.page_view + |USING parquet + |COMMENT 'This is the staging page view table' + |LOCATION '/user/external/page_view' + |TBLPROPERTIES ('p1'='v1', 'p2'='v2') + |AS SELECT * FROM src + """.stripMargin + + checkParsing(s1) + checkParsing(s2) + checkParsing(s3) + + def checkParsing(sql: String): Unit = { + parsePlan(sql) match { + case create: CreateTableAsSelectStatement => + assert(create.table == TableIdentifier("page_view", Some("mydb"))) + assert(create.partitioning.isEmpty) + assert(create.bucketSpec.isEmpty) + assert(create.properties == Map("p1" -> "v1", "p2" -> "v2")) + assert(create.provider == "parquet") + assert(create.options.isEmpty) + assert(create.location.contains("/user/external/page_view")) + assert(create.comment.contains("This is the staging page view table")) + assert(create.ifNotExists) + + case other => + fail(s"Expected to parse ${classOf[CreateTableAsSelectStatement].getClass.getName} " + + s"from query, got ${other.getClass.getName}: $sql") + } + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index a9a5e6ec67e43..735c0a5758d93 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -25,7 +25,7 @@ import org.antlr.v4.runtime.{ParserRuleContext, Token} import org.antlr.v4.runtime.tree.TerminalNode import org.apache.spark.sql.SaveMode -import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser._ @@ -376,128 +376,46 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { DescribeQueryCommand(visitQueryToDesc(ctx.queryToDesc())) } - /** - * Type to keep track of a table header: (identifier, isTemporary, ifNotExists, isExternal). - */ - type TableHeader = (TableIdentifier, Boolean, Boolean, Boolean) - - /** - * Validate a create table statement and return the [[TableIdentifier]]. - */ - override def visitCreateTableHeader( - ctx: CreateTableHeaderContext): TableHeader = withOrigin(ctx) { - val temporary = ctx.TEMPORARY != null - val ifNotExists = ctx.EXISTS != null - if (temporary && ifNotExists) { - operationNotAllowed("CREATE TEMPORARY TABLE ... IF NOT EXISTS", ctx) - } - (visitTableIdentifier(ctx.tableIdentifier), temporary, ifNotExists, ctx.EXTERNAL != null) - } - /** * Create a table, returning a [[CreateTable]] logical plan. * - * Expected format: - * {{{ - * CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name - * USING table_provider - * create_table_clauses - * [[AS] select_statement]; + * This is used to produce CreateTempViewUsing from CREATE TEMPORARY TABLE. * - * create_table_clauses (order insensitive): - * [OPTIONS table_property_list] - * [PARTITIONED BY (col_name, col_name, ...)] - * [CLUSTERED BY (col_name, col_name, ...) - * [SORTED BY (col_name [ASC|DESC], ...)] - * INTO num_buckets BUCKETS - * ] - * [LOCATION path] - * [COMMENT table_comment] - * [TBLPROPERTIES (property_name=property_value, ...)] - * }}} + * TODO: Remove this. It is used because CreateTempViewUsing is not a Catalyst plan. + * Either move CreateTempViewUsing into catalyst as a parsed logical plan, or remove it because + * it is deprecated. */ override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) { val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader) - if (external) { - operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx) - } - - checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx) - checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx) - checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx) - checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx) - checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx) - checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx) - - val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) - val provider = ctx.tableProvider.qualifiedName.getText - val schema = Option(ctx.colTypeList()).map(createSchema) - val partitionColumnNames = - Option(ctx.partitionColumnNames) - .map(visitIdentifierList(_).toArray) - .getOrElse(Array.empty[String]) - val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty) - val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec) - - val location = ctx.locationSpec.asScala.headOption.map(visitLocationSpec) - val storage = DataSource.buildStorageFormatFromOptions(options) - if (location.isDefined && storage.locationUri.isDefined) { - throw new ParseException( - "LOCATION and 'path' in OPTIONS are both used to indicate the custom table path, " + - "you can only specify one of them.", ctx) - } - val customLocation = storage.locationUri.orElse(location.map(CatalogUtils.stringToURI)) - - val tableType = if (customLocation.isDefined) { - CatalogTableType.EXTERNAL + if (!temp || ctx.query != null) { + super.visitCreateTable(ctx) } else { - CatalogTableType.MANAGED - } - - val tableDesc = CatalogTable( - identifier = table, - tableType = tableType, - storage = storage.copy(locationUri = customLocation), - schema = schema.getOrElse(new StructType), - provider = Some(provider), - partitionColumnNames = partitionColumnNames, - bucketSpec = bucketSpec, - properties = properties, - comment = Option(ctx.comment).map(string)) - - // Determine the storage mode. - val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists - - if (ctx.query != null) { - // Get the backing query. - val query = plan(ctx.query) - - if (temp) { - operationNotAllowed("CREATE TEMPORARY TABLE ... USING ... AS query", ctx) + if (external) { + operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx) } - // Don't allow explicit specification of schema for CTAS - if (schema.nonEmpty) { - operationNotAllowed( - "Schema may not be specified in a Create Table As Select (CTAS) statement", - ctx) - } - CreateTable(tableDesc, mode, Some(query)) - } else { - if (temp) { - if (ifNotExists) { - operationNotAllowed("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx) - } + checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx) + checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx) + checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx) + checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx) + checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx) + checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx) - logWarning(s"CREATE TEMPORARY TABLE ... USING ... is deprecated, please use " + - "CREATE TEMPORARY VIEW ... USING ... instead") + if (ifNotExists) { // Unlike CREATE TEMPORARY VIEW USING, CREATE TEMPORARY TABLE USING does not support // IF NOT EXISTS. Users are not allowed to replace the existing temp table. - CreateTempViewUsing(table, schema, replace = false, global = false, provider, options) - } else { - CreateTable(tableDesc, mode, None) + operationNotAllowed("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx) } + + val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) + val provider = ctx.tableProvider.qualifiedName.getText + val schema = Option(ctx.colTypeList()).map(createSchema) + + logWarning(s"CREATE TEMPORARY TABLE ... USING ... is deprecated, please use " + + "CREATE TEMPORARY VIEW ... USING ... instead") + + CreateTempViewUsing(table, schema, replace = false, global = false, provider, options) } } @@ -562,77 +480,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { "MSCK REPAIR TABLE") } - /** - * Convert a table property list into a key-value map. - * This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]]. - */ - override def visitTablePropertyList( - ctx: TablePropertyListContext): Map[String, String] = withOrigin(ctx) { - val properties = ctx.tableProperty.asScala.map { property => - val key = visitTablePropertyKey(property.key) - val value = visitTablePropertyValue(property.value) - key -> value - } - // Check for duplicate property names. - checkDuplicateKeys(properties, ctx) - properties.toMap - } - - /** - * Parse a key-value map from a [[TablePropertyListContext]], assuming all values are specified. - */ - private def visitPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String] = { - val props = visitTablePropertyList(ctx) - val badKeys = props.collect { case (key, null) => key } - if (badKeys.nonEmpty) { - operationNotAllowed( - s"Values must be specified for key(s): ${badKeys.mkString("[", ",", "]")}", ctx) - } - props - } - - /** - * Parse a list of keys from a [[TablePropertyListContext]], assuming no values are specified. - */ - private def visitPropertyKeys(ctx: TablePropertyListContext): Seq[String] = { - val props = visitTablePropertyList(ctx) - val badKeys = props.filter { case (_, v) => v != null }.keys - if (badKeys.nonEmpty) { - operationNotAllowed( - s"Values should not be specified for key(s): ${badKeys.mkString("[", ",", "]")}", ctx) - } - props.keys.toSeq - } - - /** - * A table property key can either be String or a collection of dot separated elements. This - * function extracts the property key based on whether its a string literal or a table property - * identifier. - */ - override def visitTablePropertyKey(key: TablePropertyKeyContext): String = { - if (key.STRING != null) { - string(key.STRING) - } else { - key.getText - } - } - - /** - * A table property value can be String, Integer, Boolean or Decimal. This function extracts - * the property value based on whether its a string, integer, boolean or decimal literal. - */ - override def visitTablePropertyValue(value: TablePropertyValueContext): String = { - if (value == null) { - null - } else if (value.STRING != null) { - string(value.STRING) - } else if (value.booleanValue != null) { - value.getText.toLowerCase(Locale.ROOT) - } else { - value.getText - } - } - /** * Create a [[CreateDatabaseCommand]] command. * @@ -1006,34 +853,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { newColumn = visitColType(ctx.colType)) } - /** - * Create location string. - */ - override def visitLocationSpec(ctx: LocationSpecContext): String = withOrigin(ctx) { - string(ctx.STRING) - } - - /** - * Create a [[BucketSpec]]. - */ - override def visitBucketSpec(ctx: BucketSpecContext): BucketSpec = withOrigin(ctx) { - BucketSpec( - ctx.INTEGER_VALUE.getText.toInt, - visitIdentifierList(ctx.identifierList), - Option(ctx.orderedIdentifierList) - .toSeq - .flatMap(_.orderedIdentifier.asScala) - .map { orderedIdCtx => - Option(orderedIdCtx.ordering).map(_.getText).foreach { dir => - if (dir.toLowerCase(Locale.ROOT) != "asc") { - operationNotAllowed(s"Column ordering must be ASC, was '$dir'", ctx) - } - } - - orderedIdCtx.identifier.getText - }) - } - /** * Convert a nested constants list into a sequence of string sequences. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala new file mode 100644 index 0000000000000..9fd44ea4e6379 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.util.Locale + +import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.v2.TableProvider +import org.apache.spark.sql.types.StructType + +case class DataSourceResolution(conf: SQLConf) extends Rule[LogicalPlan] with CastSupport { + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case CreateTableStatement( + table, schema, partitionCols, bucketSpec, properties, V1WriteProvider(provider), options, + location, comment, ifNotExists) => + + val tableDesc = buildCatalogTable(table, schema, partitionCols, bucketSpec, properties, + provider, options, location, comment, ifNotExists) + val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists + + CreateTable(tableDesc, mode, None) + + case CreateTableAsSelectStatement( + table, query, partitionCols, bucketSpec, properties, V1WriteProvider(provider), options, + location, comment, ifNotExists) => + + val tableDesc = buildCatalogTable(table, new StructType, partitionCols, bucketSpec, + properties, provider, options, location, comment, ifNotExists) + val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists + + CreateTable(tableDesc, mode, Some(query)) + } + + object V1WriteProvider { + private val v1WriteOverrideSet = + conf.userV1SourceWriterList.toLowerCase(Locale.ROOT).split(",").toSet + + def unapply(provider: String): Option[String] = { + if (v1WriteOverrideSet.contains(provider.toLowerCase(Locale.ROOT))) { + Some(provider) + } else { + lazy val providerClass = DataSource.lookupDataSource(provider, conf) + provider match { + case _ if classOf[TableProvider].isAssignableFrom(providerClass) => + None + case _ => + Some(provider) + } + } + } + } + + private def buildCatalogTable( + table: TableIdentifier, + schema: StructType, + partitionColumnNames: Seq[String], + bucketSpec: Option[BucketSpec], + properties: Map[String, String], + provider: String, + options: Map[String, String], + location: Option[String], + comment: Option[String], + ifNotExists: Boolean): CatalogTable = { + + val storage = DataSource.buildStorageFormatFromOptions(options) + if (location.isDefined && storage.locationUri.isDefined) { + throw new AnalysisException( + "LOCATION and 'path' in OPTIONS are both used to indicate the custom table path, " + + "you can only specify one of them.") + } + val customLocation = storage.locationUri.orElse(location.map(CatalogUtils.stringToURI)) + + val tableType = if (customLocation.isDefined) { + CatalogTableType.EXTERNAL + } else { + CatalogTableType.MANAGED + } + + CatalogTable( + identifier = table, + tableType = tableType, + storage = storage.copy(locationUri = customLocation), + schema = schema, + provider = Some(provider), + partitionColumnNames = partitionColumnNames, + bucketSpec = bucketSpec, + properties = properties, + comment = comment) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index f05aa5113e03a..d5543e8a31aad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -161,6 +161,7 @@ abstract class BaseSessionStateBuilder( new FindDataSourceTable(session) +: new ResolveSQLOnFile(session) +: new FallbackOrcDataSourceV2(session) +: + DataSourceResolution(conf) +: customResolutionRules override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala index 425a96b871ad2..be3d0794d4036 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala @@ -215,19 +215,6 @@ class SparkSqlParserSuite extends AnalysisTest { "no viable alternative at input") } - test("create table using - schema") { - assertEqual("CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet", - createTableUsing( - table = "my_tab", - schema = (new StructType) - .add("a", IntegerType, nullable = true, "test") - .add("b", StringType) - ) - ) - intercept("CREATE TABLE my_tab(a: INT COMMENT 'test', b: STRING) USING parquet", - "no viable alternative at input") - } - test("create view as insert into table") { // Single insert query intercept("CREATE VIEW testView AS INSERT INTO jt VALUES(1, 1)", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index e0ccae15f1d05..d430eeb294e13 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -415,173 +415,28 @@ class DDLParserSuite extends PlanTest with SharedSQLContext { assert(ct.tableDesc.storage.locationUri == Some(new URI("/something/anything"))) } - test("create table - with partitioned by") { - val query = "CREATE TABLE my_tab(a INT comment 'test', b STRING) " + - "USING parquet PARTITIONED BY (a)" - - val expectedTableDesc = CatalogTable( - identifier = TableIdentifier("my_tab"), - tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat.empty, - schema = new StructType() - .add("a", IntegerType, nullable = true, "test") - .add("b", StringType), - provider = Some("parquet"), - partitionColumnNames = Seq("a") - ) - - parser.parsePlan(query) match { - case CreateTable(tableDesc, _, None) => - assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) - case other => - fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + - s"got ${other.getClass.getName}: $query") - } - } - - test("create table - with bucket") { - val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " + - "CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS" - - val expectedTableDesc = CatalogTable( - identifier = TableIdentifier("my_tab"), - tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat.empty, - schema = new StructType().add("a", IntegerType).add("b", StringType), - provider = Some("parquet"), - bucketSpec = Some(BucketSpec(5, Seq("a"), Seq("b"))) - ) - - parser.parsePlan(query) match { - case CreateTable(tableDesc, _, None) => - assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) - case other => - fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + - s"got ${other.getClass.getName}: $query") - } - } - - test("create table - with comment") { - val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 'abc'" - - val expectedTableDesc = CatalogTable( - identifier = TableIdentifier("my_tab"), - tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat.empty, - schema = new StructType().add("a", IntegerType).add("b", StringType), - provider = Some("parquet"), - comment = Some("abc")) - - parser.parsePlan(sql) match { - case CreateTable(tableDesc, _, None) => - assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) - case other => - fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + - s"got ${other.getClass.getName}: $sql") + test("Duplicate clauses - create hive table") { + def createTableHeader(duplicateClause: String): String = { + s"CREATE TABLE my_tab(a INT, b STRING) STORED AS parquet $duplicateClause $duplicateClause" } - } - - test("create table - with table properties") { - val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet TBLPROPERTIES('test' = 'test')" - val expectedTableDesc = CatalogTable( - identifier = TableIdentifier("my_tab"), - tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat.empty, - schema = new StructType().add("a", IntegerType).add("b", StringType), - provider = Some("parquet"), - properties = Map("test" -> "test")) - - parser.parsePlan(sql) match { - case CreateTable(tableDesc, _, None) => - assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) - case other => - fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + - s"got ${other.getClass.getName}: $sql") - } - } - - test("Duplicate clauses - create table") { - def createTableHeader(duplicateClause: String, isNative: Boolean): String = { - val fileFormat = if (isNative) "USING parquet" else "STORED AS parquet" - s"CREATE TABLE my_tab(a INT, b STRING) $fileFormat $duplicateClause $duplicateClause" - } - - Seq(true, false).foreach { isNative => - intercept(createTableHeader("TBLPROPERTIES('test' = 'test2')", isNative), - "Found duplicate clauses: TBLPROPERTIES") - intercept(createTableHeader("LOCATION '/tmp/file'", isNative), - "Found duplicate clauses: LOCATION") - intercept(createTableHeader("COMMENT 'a table'", isNative), - "Found duplicate clauses: COMMENT") - intercept(createTableHeader("CLUSTERED BY(b) INTO 256 BUCKETS", isNative), - "Found duplicate clauses: CLUSTERED BY") - } - - // Only for native data source tables - intercept(createTableHeader("PARTITIONED BY (b)", isNative = true), + intercept(createTableHeader("TBLPROPERTIES('test' = 'test2')"), + "Found duplicate clauses: TBLPROPERTIES") + intercept(createTableHeader("LOCATION '/tmp/file'"), + "Found duplicate clauses: LOCATION") + intercept(createTableHeader("COMMENT 'a table'"), + "Found duplicate clauses: COMMENT") + intercept(createTableHeader("CLUSTERED BY(b) INTO 256 BUCKETS"), + "Found duplicate clauses: CLUSTERED BY") + intercept(createTableHeader("PARTITIONED BY (k int)"), "Found duplicate clauses: PARTITIONED BY") - - // Only for Hive serde tables - intercept(createTableHeader("PARTITIONED BY (k int)", isNative = false), - "Found duplicate clauses: PARTITIONED BY") - intercept(createTableHeader("STORED AS parquet", isNative = false), + intercept(createTableHeader("STORED AS parquet"), "Found duplicate clauses: STORED AS/BY") intercept( - createTableHeader("ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'", isNative = false), + createTableHeader("ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'"), "Found duplicate clauses: ROW FORMAT") } - test("create table - with location") { - val v1 = "CREATE TABLE my_tab(a INT, b STRING) USING parquet LOCATION '/tmp/file'" - - val expectedTableDesc = CatalogTable( - identifier = TableIdentifier("my_tab"), - tableType = CatalogTableType.EXTERNAL, - storage = CatalogStorageFormat.empty.copy(locationUri = Some(new URI("/tmp/file"))), - schema = new StructType().add("a", IntegerType).add("b", StringType), - provider = Some("parquet")) - - parser.parsePlan(v1) match { - case CreateTable(tableDesc, _, None) => - assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) - case other => - fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + - s"got ${other.getClass.getName}: $v1") - } - - val v2 = - """ - |CREATE TABLE my_tab(a INT, b STRING) - |USING parquet - |OPTIONS (path '/tmp/file') - |LOCATION '/tmp/file' - """.stripMargin - val e = intercept[ParseException] { - parser.parsePlan(v2) - } - assert(e.message.contains("you can only specify one of them.")) - } - - test("create table - byte length literal table name") { - val sql = "CREATE TABLE 1m.2g(a INT) USING parquet" - - val expectedTableDesc = CatalogTable( - identifier = TableIdentifier("2g", Some("1m")), - tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat.empty, - schema = new StructType().add("a", IntegerType), - provider = Some("parquet")) - - parser.parsePlan(sql) match { - case CreateTable(tableDesc, _, None) => - assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) - case other => - fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + - s"got ${other.getClass.getName}: $sql") - } - } - test("insert overwrite directory") { val v1 = "INSERT OVERWRITE DIRECTORY '/tmp/file' USING parquet SELECT 1 as a" parser.parsePlan(v1) match { @@ -1165,84 +1020,6 @@ class DDLParserSuite extends PlanTest with SharedSQLContext { comparePlans(parsed, expected) } - test("support for other types in OPTIONS") { - val sql = - """ - |CREATE TABLE table_name USING json - |OPTIONS (a 1, b 0.1, c TRUE) - """.stripMargin - - val expectedTableDesc = CatalogTable( - identifier = TableIdentifier("table_name"), - tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat.empty.copy( - properties = Map("a" -> "1", "b" -> "0.1", "c" -> "true") - ), - schema = new StructType, - provider = Some("json") - ) - - parser.parsePlan(sql) match { - case CreateTable(tableDesc, _, None) => - assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) - case other => - fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + - s"got ${other.getClass.getName}: $sql") - } - } - - test("Test CTAS against data source tables") { - val s1 = - """ - |CREATE TABLE IF NOT EXISTS mydb.page_view - |USING parquet - |COMMENT 'This is the staging page view table' - |LOCATION '/user/external/page_view' - |TBLPROPERTIES ('p1'='v1', 'p2'='v2') - |AS SELECT * FROM src - """.stripMargin - - val s2 = - """ - |CREATE TABLE IF NOT EXISTS mydb.page_view - |USING parquet - |LOCATION '/user/external/page_view' - |COMMENT 'This is the staging page view table' - |TBLPROPERTIES ('p1'='v1', 'p2'='v2') - |AS SELECT * FROM src - """.stripMargin - - val s3 = - """ - |CREATE TABLE IF NOT EXISTS mydb.page_view - |USING parquet - |COMMENT 'This is the staging page view table' - |LOCATION '/user/external/page_view' - |TBLPROPERTIES ('p1'='v1', 'p2'='v2') - |AS SELECT * FROM src - """.stripMargin - - checkParsing(s1) - checkParsing(s2) - checkParsing(s3) - - def checkParsing(sql: String): Unit = { - val (desc, exists) = extractTableDesc(sql) - assert(exists) - assert(desc.identifier.database == Some("mydb")) - assert(desc.identifier.table == "page_view") - assert(desc.storage.locationUri == Some(new URI("/user/external/page_view"))) - assert(desc.schema.isEmpty) // will be populated later when the table is actually created - assert(desc.comment == Some("This is the staging page view table")) - assert(desc.viewText.isEmpty) - assert(desc.viewDefaultDatabase.isEmpty) - assert(desc.viewQueryColumnNames.isEmpty) - assert(desc.partitionColumnNames.isEmpty) - assert(desc.provider == Some("parquet")) - assert(desc.properties == Map("p1" -> "v1", "p2" -> "v2")) - } - } - test("Test CTAS #1") { val s1 = """ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala new file mode 100644 index 0000000000000..89c5df0900b61 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import java.net.URI + +import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.AnalysisTest +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceResolution} +import org.apache.spark.sql.types.{IntegerType, StringType, StructType} + +class PlanResolutionSuite extends AnalysisTest { + import CatalystSqlParser._ + + def parseAndResolve(query: String): LogicalPlan = { + DataSourceResolution(conf).apply(parsePlan(query)) + } + + private def extractTableDesc(sql: String): (CatalogTable, Boolean) = { + parseAndResolve(sql).collect { + case CreateTable(tableDesc, mode, _) => (tableDesc, mode == SaveMode.Ignore) + }.head + } + + test("create table - with partitioned by") { + val query = "CREATE TABLE my_tab(a INT comment 'test', b STRING) " + + "USING parquet PARTITIONED BY (a)" + + val expectedTableDesc = CatalogTable( + identifier = TableIdentifier("my_tab"), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty, + schema = new StructType() + .add("a", IntegerType, nullable = true, "test") + .add("b", StringType), + provider = Some("parquet"), + partitionColumnNames = Seq("a") + ) + + parseAndResolve(query) match { + case CreateTable(tableDesc, _, None) => + assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) + case other => + fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + + s"got ${other.getClass.getName}: $query") + } + } + + test("create table - with bucket") { + val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " + + "CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS" + + val expectedTableDesc = CatalogTable( + identifier = TableIdentifier("my_tab"), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty, + schema = new StructType().add("a", IntegerType).add("b", StringType), + provider = Some("parquet"), + bucketSpec = Some(BucketSpec(5, Seq("a"), Seq("b"))) + ) + + parseAndResolve(query) match { + case CreateTable(tableDesc, _, None) => + assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) + case other => + fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + + s"got ${other.getClass.getName}: $query") + } + } + + test("create table - with comment") { + val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 'abc'" + + val expectedTableDesc = CatalogTable( + identifier = TableIdentifier("my_tab"), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty, + schema = new StructType().add("a", IntegerType).add("b", StringType), + provider = Some("parquet"), + comment = Some("abc")) + + parseAndResolve(sql) match { + case CreateTable(tableDesc, _, None) => + assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) + case other => + fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + + s"got ${other.getClass.getName}: $sql") + } + } + + test("create table - with table properties") { + val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet TBLPROPERTIES('test' = 'test')" + + val expectedTableDesc = CatalogTable( + identifier = TableIdentifier("my_tab"), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty, + schema = new StructType().add("a", IntegerType).add("b", StringType), + provider = Some("parquet"), + properties = Map("test" -> "test")) + + parseAndResolve(sql) match { + case CreateTable(tableDesc, _, None) => + assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) + case other => + fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + + s"got ${other.getClass.getName}: $sql") + } + } + + test("create table - with location") { + val v1 = "CREATE TABLE my_tab(a INT, b STRING) USING parquet LOCATION '/tmp/file'" + + val expectedTableDesc = CatalogTable( + identifier = TableIdentifier("my_tab"), + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat.empty.copy(locationUri = Some(new URI("/tmp/file"))), + schema = new StructType().add("a", IntegerType).add("b", StringType), + provider = Some("parquet")) + + parseAndResolve(v1) match { + case CreateTable(tableDesc, _, None) => + assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) + case other => + fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + + s"got ${other.getClass.getName}: $v1") + } + + val v2 = + """ + |CREATE TABLE my_tab(a INT, b STRING) + |USING parquet + |OPTIONS (path '/tmp/file') + |LOCATION '/tmp/file' + """.stripMargin + val e = intercept[AnalysisException] { + parseAndResolve(v2) + } + assert(e.message.contains("you can only specify one of them.")) + } + + test("create table - byte length literal table name") { + val sql = "CREATE TABLE 1m.2g(a INT) USING parquet" + + val expectedTableDesc = CatalogTable( + identifier = TableIdentifier("2g", Some("1m")), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty, + schema = new StructType().add("a", IntegerType), + provider = Some("parquet")) + + parseAndResolve(sql) match { + case CreateTable(tableDesc, _, None) => + assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) + case other => + fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + + s"got ${other.getClass.getName}: $sql") + } + } + + test("support for other types in OPTIONS") { + val sql = + """ + |CREATE TABLE table_name USING json + |OPTIONS (a 1, b 0.1, c TRUE) + """.stripMargin + + val expectedTableDesc = CatalogTable( + identifier = TableIdentifier("table_name"), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty.copy( + properties = Map("a" -> "1", "b" -> "0.1", "c" -> "true") + ), + schema = new StructType, + provider = Some("json") + ) + + parseAndResolve(sql) match { + case CreateTable(tableDesc, _, None) => + assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) + case other => + fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + + s"got ${other.getClass.getName}: $sql") + } + } + + test("Test CTAS against data source tables") { + val s1 = + """ + |CREATE TABLE IF NOT EXISTS mydb.page_view + |USING parquet + |COMMENT 'This is the staging page view table' + |LOCATION '/user/external/page_view' + |TBLPROPERTIES ('p1'='v1', 'p2'='v2') + |AS SELECT * FROM src + """.stripMargin + + val s2 = + """ + |CREATE TABLE IF NOT EXISTS mydb.page_view + |USING parquet + |LOCATION '/user/external/page_view' + |COMMENT 'This is the staging page view table' + |TBLPROPERTIES ('p1'='v1', 'p2'='v2') + |AS SELECT * FROM src + """.stripMargin + + val s3 = + """ + |CREATE TABLE IF NOT EXISTS mydb.page_view + |USING parquet + |COMMENT 'This is the staging page view table' + |LOCATION '/user/external/page_view' + |TBLPROPERTIES ('p1'='v1', 'p2'='v2') + |AS SELECT * FROM src + """.stripMargin + + checkParsing(s1) + checkParsing(s2) + checkParsing(s3) + + def checkParsing(sql: String): Unit = { + val (desc, exists) = extractTableDesc(sql) + assert(exists) + assert(desc.identifier.database.contains("mydb")) + assert(desc.identifier.table == "page_view") + assert(desc.storage.locationUri.contains(new URI("/user/external/page_view"))) + assert(desc.schema.isEmpty) // will be populated later when the table is actually created + assert(desc.comment.contains("This is the staging page view table")) + assert(desc.viewText.isEmpty) + assert(desc.viewDefaultDatabase.isEmpty) + assert(desc.viewQueryColumnNames.isEmpty) + assert(desc.partitionColumnNames.isEmpty) + assert(desc.provider.contains("parquet")) + assert(desc.properties == Map("p1" -> "v1", "p2" -> "v2")) + } + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index 68f4b2ddbac0b..877a0dadf0b03 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -73,6 +73,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session new FindDataSourceTable(session) +: new ResolveSQLOnFile(session) +: new FallbackOrcDataSourceV2(session) +: + DataSourceResolution(conf) +: customResolutionRules override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =