From aedce480186fee0d3e5742de32f0632f9522abf8 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 8 Mar 2019 12:09:12 -0800 Subject: [PATCH 01/12] SPARK-27108: Add parsed SQL plans for create, CTAS. --- .../sql/catalyst/parser/AstBuilder.scala | 192 ++++++++++++++- .../plans/logical/sql/CreateTable.scala | 64 +++++ .../plans/logical/sql/ParsedLogicalPlan.scala | 38 +++ .../spark/sql/execution/SparkSqlParser.scala | 233 ++---------------- .../datasources/DataSourceStrategy.scala | 76 +++++- 5 files changed, 394 insertions(+), 209 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTable.scala create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedLogicalPlan.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 38a61b8731778..8756018cda933 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -30,12 +30,13 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last} import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTable, CreateTableAsSelect} import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -1888,4 +1889,193 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging val structField = StructField(identifier.getText, typedVisit(dataType), nullable = true) if (STRING == null) structField else structField.withComment(string(STRING)) } + + /** + * Create location string. + */ + override def visitLocationSpec(ctx: LocationSpecContext): String = withOrigin(ctx) { + string(ctx.STRING) + } + + /** + * Create a [[BucketSpec]]. + */ + override def visitBucketSpec(ctx: BucketSpecContext): BucketSpec = withOrigin(ctx) { + BucketSpec( + ctx.INTEGER_VALUE.getText.toInt, + visitIdentifierList(ctx.identifierList), + Option(ctx.orderedIdentifierList) + .toSeq + .flatMap(_.orderedIdentifier.asScala) + .map { orderedIdCtx => + Option(orderedIdCtx.ordering).map(_.getText).foreach { dir => + if (dir.toLowerCase(Locale.ROOT) != "asc") { + operationNotAllowed(s"Column ordering must be ASC, was '$dir'", ctx) + } + } + + orderedIdCtx.identifier.getText + }) + } + + /** + * Convert a table property list into a key-value map. + * This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]]. + */ + override def visitTablePropertyList( + ctx: TablePropertyListContext): Map[String, String] = withOrigin(ctx) { + val properties = ctx.tableProperty.asScala.map { property => + val key = visitTablePropertyKey(property.key) + val value = visitTablePropertyValue(property.value) + key -> value + } + // Check for duplicate property names. + checkDuplicateKeys(properties, ctx) + properties.toMap + } + + /** + * Parse a key-value map from a [[TablePropertyListContext]], assuming all values are specified. + */ + def visitPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String] = { + val props = visitTablePropertyList(ctx) + val badKeys = props.collect { case (key, null) => key } + if (badKeys.nonEmpty) { + operationNotAllowed( + s"Values must be specified for key(s): ${badKeys.mkString("[", ",", "]")}", ctx) + } + props + } + + /** + * Parse a list of keys from a [[TablePropertyListContext]], assuming no values are specified. + */ + def visitPropertyKeys(ctx: TablePropertyListContext): Seq[String] = { + val props = visitTablePropertyList(ctx) + val badKeys = props.filter { case (_, v) => v != null }.keys + if (badKeys.nonEmpty) { + operationNotAllowed( + s"Values should not be specified for key(s): ${badKeys.mkString("[", ",", "]")}", ctx) + } + props.keys.toSeq + } + + /** + * A table property key can either be String or a collection of dot separated elements. This + * function extracts the property key based on whether its a string literal or a table property + * identifier. + */ + override def visitTablePropertyKey(key: TablePropertyKeyContext): String = { + if (key.STRING != null) { + string(key.STRING) + } else { + key.getText + } + } + + /** + * A table property value can be String, Integer, Boolean or Decimal. This function extracts + * the property value based on whether its a string, integer, boolean or decimal literal. + */ + override def visitTablePropertyValue(value: TablePropertyValueContext): String = { + if (value == null) { + null + } else if (value.STRING != null) { + string(value.STRING) + } else if (value.booleanValue != null) { + value.getText.toLowerCase(Locale.ROOT) + } else { + value.getText + } + } + + /** + * Type to keep track of a table header: (identifier, isTemporary, ifNotExists, isExternal). + */ + type TableHeader = (TableIdentifier, Boolean, Boolean, Boolean) + + /** + * Validate a create table statement and return the [[TableIdentifier]]. + */ + override def visitCreateTableHeader( + ctx: CreateTableHeaderContext): TableHeader = withOrigin(ctx) { + val temporary = ctx.TEMPORARY != null + val ifNotExists = ctx.EXISTS != null + if (temporary && ifNotExists) { + operationNotAllowed("CREATE TEMPORARY TABLE ... IF NOT EXISTS", ctx) + } + (visitTableIdentifier(ctx.tableIdentifier), temporary, ifNotExists, ctx.EXTERNAL != null) + } + + /** + * Create a table, returning a [[CreateTable]] logical plan. + * + * Expected format: + * {{{ + * CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name + * USING table_provider + * create_table_clauses + * [[AS] select_statement]; + * + * create_table_clauses (order insensitive): + * [OPTIONS table_property_list] + * [PARTITIONED BY (col_name, col_name, ...)] + * [CLUSTERED BY (col_name, col_name, ...) + * [SORTED BY (col_name [ASC|DESC], ...)] + * INTO num_buckets BUCKETS + * ] + * [LOCATION path] + * [COMMENT table_comment] + * [TBLPROPERTIES (property_name=property_value, ...)] + * }}} + */ + override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) { + val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader) + if (external) { + operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx) + } + + checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx) + checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx) + checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx) + checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx) + checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx) + checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx) + + val schema = Option(ctx.colTypeList()).map(createSchema) + val partitionCols: Seq[String] = + Option(ctx.partitionColumnNames).map(visitIdentifierList).getOrElse(Nil) + val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec) + val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty) + val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) + + val provider = ctx.tableProvider.qualifiedName.getText + val location = ctx.locationSpec.asScala.headOption.map(visitLocationSpec) + val comment = Option(ctx.comment).map(string) + + Option(ctx.query).map(plan) match { + case Some(_) if temp => + operationNotAllowed("CREATE TEMPORARY TABLE ... USING ... AS query", ctx) + + case Some(_) if schema.isDefined => + operationNotAllowed( + "Schema may not be specified in a Create Table As Select (CTAS) statement", + ctx) + + case Some(query) => + CreateTableAsSelect( + table, query, partitionCols, bucketSpec, properties, provider, options, location, comment, + ifNotExists = ifNotExists) + + case None if temp => + // CREATE TEMPORARY TABLE ... USING ... is not supported by the catalyst parser. + // Use CREATE TEMPORARY VIEW ... USING ... instead. + operationNotAllowed("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx) + + case _ => + CreateTable(table, schema.getOrElse(new StructType), partitionCols, bucketSpec, properties, + provider, options, location, comment, ifNotExists = ifNotExists) + } + } + } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTable.scala new file mode 100644 index 0000000000000..715ccaf1ec8ef --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTable.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.sql + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.types.StructType + +/** + * A CREATE TABLE command, as parsed from SQL. + */ +case class CreateTable( + table: TableIdentifier, + tableSchema: StructType, + partitioning: Seq[String], + bucketSpec: Option[BucketSpec], + properties: Map[String, String], + provider: String, + options: Map[String, String], + location: Option[String], + comment: Option[String], + ifNotExists: Boolean) extends ParsedLogicalPlan { + + override def output: Seq[Attribute] = Seq.empty + + override def children: Seq[LogicalPlan] = Seq.empty +} + +/** + * A CREATE TABLE AS SELECT command, as parsed from SQL. + */ +case class CreateTableAsSelect( + table: TableIdentifier, + asSelect: LogicalPlan, + partitioning: Seq[String], + bucketSpec: Option[BucketSpec], + properties: Map[String, String], + provider: String, + options: Map[String, String], + location: Option[String], + comment: Option[String], + ifNotExists: Boolean) extends ParsedLogicalPlan { + + override def output: Seq[Attribute] = Seq.empty + + override def children: Seq[LogicalPlan] = Seq(asSelect) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedLogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedLogicalPlan.scala new file mode 100644 index 0000000000000..d1625fba92700 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedLogicalPlan.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.sql + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +/** + * A logical plan node that contains exactly what was parsed from SQL. + * + * This is used to hold information parsed from SQL when there are multiple implementations of a + * query or command. For example, CREATE TABLE may be implemented by different nodes for v1 and v2. + * Instead of parsing directly to a v1 CreateTable that keeps metadata in CatalogTable, and then + * converting that v1 metadata to the v2 equivalent, the sql [[CreateTable]] plan is produced by + * the parser and converted once into both implementations. + * + * Parsed logical plans are not resolved because they must be converted to concrete logical plans. + * + * Parsed logical plans are located in Catalyst so that as much SQL parsing logic as possible is be + * kept in a [[org.apache.spark.sql.catalyst.parser.AbstractSqlParser]]. + */ +private[sql] abstract class ParsedLogicalPlan extends LogicalPlan { + override lazy val resolved = false +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index a9a5e6ec67e43..735c0a5758d93 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -25,7 +25,7 @@ import org.antlr.v4.runtime.{ParserRuleContext, Token} import org.antlr.v4.runtime.tree.TerminalNode import org.apache.spark.sql.SaveMode -import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser._ @@ -376,128 +376,46 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { DescribeQueryCommand(visitQueryToDesc(ctx.queryToDesc())) } - /** - * Type to keep track of a table header: (identifier, isTemporary, ifNotExists, isExternal). - */ - type TableHeader = (TableIdentifier, Boolean, Boolean, Boolean) - - /** - * Validate a create table statement and return the [[TableIdentifier]]. - */ - override def visitCreateTableHeader( - ctx: CreateTableHeaderContext): TableHeader = withOrigin(ctx) { - val temporary = ctx.TEMPORARY != null - val ifNotExists = ctx.EXISTS != null - if (temporary && ifNotExists) { - operationNotAllowed("CREATE TEMPORARY TABLE ... IF NOT EXISTS", ctx) - } - (visitTableIdentifier(ctx.tableIdentifier), temporary, ifNotExists, ctx.EXTERNAL != null) - } - /** * Create a table, returning a [[CreateTable]] logical plan. * - * Expected format: - * {{{ - * CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name - * USING table_provider - * create_table_clauses - * [[AS] select_statement]; + * This is used to produce CreateTempViewUsing from CREATE TEMPORARY TABLE. * - * create_table_clauses (order insensitive): - * [OPTIONS table_property_list] - * [PARTITIONED BY (col_name, col_name, ...)] - * [CLUSTERED BY (col_name, col_name, ...) - * [SORTED BY (col_name [ASC|DESC], ...)] - * INTO num_buckets BUCKETS - * ] - * [LOCATION path] - * [COMMENT table_comment] - * [TBLPROPERTIES (property_name=property_value, ...)] - * }}} + * TODO: Remove this. It is used because CreateTempViewUsing is not a Catalyst plan. + * Either move CreateTempViewUsing into catalyst as a parsed logical plan, or remove it because + * it is deprecated. */ override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) { val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader) - if (external) { - operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx) - } - - checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx) - checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx) - checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx) - checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx) - checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx) - checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx) - - val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) - val provider = ctx.tableProvider.qualifiedName.getText - val schema = Option(ctx.colTypeList()).map(createSchema) - val partitionColumnNames = - Option(ctx.partitionColumnNames) - .map(visitIdentifierList(_).toArray) - .getOrElse(Array.empty[String]) - val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty) - val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec) - - val location = ctx.locationSpec.asScala.headOption.map(visitLocationSpec) - val storage = DataSource.buildStorageFormatFromOptions(options) - if (location.isDefined && storage.locationUri.isDefined) { - throw new ParseException( - "LOCATION and 'path' in OPTIONS are both used to indicate the custom table path, " + - "you can only specify one of them.", ctx) - } - val customLocation = storage.locationUri.orElse(location.map(CatalogUtils.stringToURI)) - - val tableType = if (customLocation.isDefined) { - CatalogTableType.EXTERNAL + if (!temp || ctx.query != null) { + super.visitCreateTable(ctx) } else { - CatalogTableType.MANAGED - } - - val tableDesc = CatalogTable( - identifier = table, - tableType = tableType, - storage = storage.copy(locationUri = customLocation), - schema = schema.getOrElse(new StructType), - provider = Some(provider), - partitionColumnNames = partitionColumnNames, - bucketSpec = bucketSpec, - properties = properties, - comment = Option(ctx.comment).map(string)) - - // Determine the storage mode. - val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists - - if (ctx.query != null) { - // Get the backing query. - val query = plan(ctx.query) - - if (temp) { - operationNotAllowed("CREATE TEMPORARY TABLE ... USING ... AS query", ctx) + if (external) { + operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx) } - // Don't allow explicit specification of schema for CTAS - if (schema.nonEmpty) { - operationNotAllowed( - "Schema may not be specified in a Create Table As Select (CTAS) statement", - ctx) - } - CreateTable(tableDesc, mode, Some(query)) - } else { - if (temp) { - if (ifNotExists) { - operationNotAllowed("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx) - } + checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx) + checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx) + checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx) + checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx) + checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx) + checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx) - logWarning(s"CREATE TEMPORARY TABLE ... USING ... is deprecated, please use " + - "CREATE TEMPORARY VIEW ... USING ... instead") + if (ifNotExists) { // Unlike CREATE TEMPORARY VIEW USING, CREATE TEMPORARY TABLE USING does not support // IF NOT EXISTS. Users are not allowed to replace the existing temp table. - CreateTempViewUsing(table, schema, replace = false, global = false, provider, options) - } else { - CreateTable(tableDesc, mode, None) + operationNotAllowed("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx) } + + val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) + val provider = ctx.tableProvider.qualifiedName.getText + val schema = Option(ctx.colTypeList()).map(createSchema) + + logWarning(s"CREATE TEMPORARY TABLE ... USING ... is deprecated, please use " + + "CREATE TEMPORARY VIEW ... USING ... instead") + + CreateTempViewUsing(table, schema, replace = false, global = false, provider, options) } } @@ -562,77 +480,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { "MSCK REPAIR TABLE") } - /** - * Convert a table property list into a key-value map. - * This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]]. - */ - override def visitTablePropertyList( - ctx: TablePropertyListContext): Map[String, String] = withOrigin(ctx) { - val properties = ctx.tableProperty.asScala.map { property => - val key = visitTablePropertyKey(property.key) - val value = visitTablePropertyValue(property.value) - key -> value - } - // Check for duplicate property names. - checkDuplicateKeys(properties, ctx) - properties.toMap - } - - /** - * Parse a key-value map from a [[TablePropertyListContext]], assuming all values are specified. - */ - private def visitPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String] = { - val props = visitTablePropertyList(ctx) - val badKeys = props.collect { case (key, null) => key } - if (badKeys.nonEmpty) { - operationNotAllowed( - s"Values must be specified for key(s): ${badKeys.mkString("[", ",", "]")}", ctx) - } - props - } - - /** - * Parse a list of keys from a [[TablePropertyListContext]], assuming no values are specified. - */ - private def visitPropertyKeys(ctx: TablePropertyListContext): Seq[String] = { - val props = visitTablePropertyList(ctx) - val badKeys = props.filter { case (_, v) => v != null }.keys - if (badKeys.nonEmpty) { - operationNotAllowed( - s"Values should not be specified for key(s): ${badKeys.mkString("[", ",", "]")}", ctx) - } - props.keys.toSeq - } - - /** - * A table property key can either be String or a collection of dot separated elements. This - * function extracts the property key based on whether its a string literal or a table property - * identifier. - */ - override def visitTablePropertyKey(key: TablePropertyKeyContext): String = { - if (key.STRING != null) { - string(key.STRING) - } else { - key.getText - } - } - - /** - * A table property value can be String, Integer, Boolean or Decimal. This function extracts - * the property value based on whether its a string, integer, boolean or decimal literal. - */ - override def visitTablePropertyValue(value: TablePropertyValueContext): String = { - if (value == null) { - null - } else if (value.STRING != null) { - string(value.STRING) - } else if (value.booleanValue != null) { - value.getText.toLowerCase(Locale.ROOT) - } else { - value.getText - } - } - /** * Create a [[CreateDatabaseCommand]] command. * @@ -1006,34 +853,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { newColumn = visitColType(ctx.colType)) } - /** - * Create location string. - */ - override def visitLocationSpec(ctx: LocationSpecContext): String = withOrigin(ctx) { - string(ctx.STRING) - } - - /** - * Create a [[BucketSpec]]. - */ - override def visitBucketSpec(ctx: BucketSpecContext): BucketSpec = withOrigin(ctx) { - BucketSpec( - ctx.INTEGER_VALUE.getText.toInt, - visitIdentifierList(ctx.identifierList), - Option(ctx.orderedIdentifierList) - .toSeq - .flatMap(_.orderedIdentifier.asScala) - .map { orderedIdCtx => - Option(orderedIdCtx.ordering).map(_.getText).foreach { dir => - if (dir.toLowerCase(Locale.ROOT) != "asc") { - operationNotAllowed(s"Column ordering must be ASC, was '$dir'", ctx) - } - } - - orderedIdCtx.identifier.getText - }) - } - /** * Convert a nested constants list into a sequence of string sequences. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 4c699276135e0..4fc2ff6c36e68 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, QualifiedTableName} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog._ @@ -34,11 +34,13 @@ import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.sql import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ +import org.apache.spark.sql.sources.v2.TableProvider import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -132,6 +134,26 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast } override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case sql.CreateTable( + table, schema, partitionCols, bucketSpec, properties, V1Provider(provider), options, + location, comment, ifNotExists) => + + val tableDesc = buildCatalogTable(table, schema, partitionCols, bucketSpec, properties, + provider, options, location, comment, ifNotExists) + val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists + + CreateTable(tableDesc, mode, None) + + case sql.CreateTableAsSelect( + table, query, partitionCols, bucketSpec, properties, V1Provider(provider), options, + location, comment, ifNotExists) => + + val tableDesc = buildCatalogTable(table, query.schema, partitionCols, bucketSpec, properties, + provider, options, location, comment, ifNotExists) + val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists + + CreateTable(tableDesc, mode, Some(query)) + case CreateTable(tableDesc, mode, None) if DDLUtils.isDatasourceTable(tableDesc) => CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore) @@ -209,6 +231,58 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast Some(t.location), actualQuery.output.map(_.name)) } + + object V1Provider { + def unapply(provider: String): Option[String] = { + lazy val providerClass = DataSource.lookupDataSource(provider, conf) + provider match { + case "hive" => + None + case _ if classOf[TableProvider].isAssignableFrom(providerClass) => + None + case _ => + Some(provider) + } + } + } + + private def buildCatalogTable( + table: TableIdentifier, + schema: StructType, + partitionColumnNames: Seq[String], + bucketSpec: Option[BucketSpec], + properties: Map[String, String], + provider: String, + options: Map[String, String], + location: Option[String], + comment: Option[String], + ifNotExists: Boolean): CatalogTable = { + + val storage = DataSource.buildStorageFormatFromOptions(options) + if (location.isDefined && storage.locationUri.isDefined) { + throw new AnalysisException( + "LOCATION and 'path' in OPTIONS are both used to indicate the custom table path, " + + "you can only specify one of them.") + } + val customLocation = storage.locationUri.orElse(location.map(CatalogUtils.stringToURI)) + + val tableType = if (customLocation.isDefined) { + CatalogTableType.EXTERNAL + } else { + CatalogTableType.MANAGED + } + + CatalogTable( + identifier = table, + tableType = tableType, + storage = storage.copy(locationUri = customLocation), + schema = schema, + provider = Some(provider), + partitionColumnNames = partitionColumnNames, + bucketSpec = bucketSpec, + properties = properties, + comment = comment) + } } From 4a89dd7d05dd4e042c188520aa29cf69936f21c5 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 13 Mar 2019 15:32:57 -0700 Subject: [PATCH 02/12] Update for review comments. * Make ParsedLogicalPlan.resovled final. * Add docs to CreateTable to be clear that it is metadata-only. --- .../spark/sql/catalyst/plans/logical/sql/CreateTable.scala | 2 ++ .../sql/catalyst/plans/logical/sql/ParsedLogicalPlan.scala | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTable.scala index 715ccaf1ec8ef..3962910c38f4c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTable.scala @@ -25,6 +25,8 @@ import org.apache.spark.sql.types.StructType /** * A CREATE TABLE command, as parsed from SQL. + * + * This is a metadata-only command and is not used to write data to the created table. */ case class CreateTable( table: TableIdentifier, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedLogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedLogicalPlan.scala index d1625fba92700..002497fc94a40 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedLogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedLogicalPlan.scala @@ -34,5 +34,5 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan * kept in a [[org.apache.spark.sql.catalyst.parser.AbstractSqlParser]]. */ private[sql] abstract class ParsedLogicalPlan extends LogicalPlan { - override lazy val resolved = false + final override lazy val resolved = false } From 69f9a29ba3fc12d42c5d2c38798622cec6a67091 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 13 Mar 2019 15:54:29 -0700 Subject: [PATCH 03/12] Move new DataSource resolution rules into DataSourceResolution. --- .../datasources/DataSourceResolution.scala | 105 ++++++++++++++++++ .../datasources/DataSourceStrategy.scala | 76 +------------ .../internal/BaseSessionStateBuilder.scala | 1 + .../sql/hive/HiveSessionStateBuilder.scala | 1 + 4 files changed, 108 insertions(+), 75 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala new file mode 100644 index 0000000000000..b48a625a69b8b --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.sql +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.v2.TableProvider +import org.apache.spark.sql.types.StructType + +case class DataSourceResolution(conf: SQLConf) extends Rule[LogicalPlan] with CastSupport { + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case sql.CreateTable( + table, schema, partitionCols, bucketSpec, properties, V1Provider(provider), options, + location, comment, ifNotExists) => + + val tableDesc = buildCatalogTable(table, schema, partitionCols, bucketSpec, properties, + provider, options, location, comment, ifNotExists) + val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists + + CreateTable(tableDesc, mode, None) + + case sql.CreateTableAsSelect( + table, query, partitionCols, bucketSpec, properties, V1Provider(provider), options, + location, comment, ifNotExists) => + + val tableDesc = buildCatalogTable(table, query.schema, partitionCols, bucketSpec, properties, + provider, options, location, comment, ifNotExists) + val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists + + CreateTable(tableDesc, mode, Some(query)) + } + + object V1Provider { + def unapply(provider: String): Option[String] = { + lazy val providerClass = DataSource.lookupDataSource(provider, conf) + provider match { + case "hive" => + None + case _ if classOf[TableProvider].isAssignableFrom(providerClass) => + None + case _ => + Some(provider) + } + } + } + + private def buildCatalogTable( + table: TableIdentifier, + schema: StructType, + partitionColumnNames: Seq[String], + bucketSpec: Option[BucketSpec], + properties: Map[String, String], + provider: String, + options: Map[String, String], + location: Option[String], + comment: Option[String], + ifNotExists: Boolean): CatalogTable = { + + val storage = DataSource.buildStorageFormatFromOptions(options) + if (location.isDefined && storage.locationUri.isDefined) { + throw new AnalysisException( + "LOCATION and 'path' in OPTIONS are both used to indicate the custom table path, " + + "you can only specify one of them.") + } + val customLocation = storage.locationUri.orElse(location.map(CatalogUtils.stringToURI)) + + val tableType = if (customLocation.isDefined) { + CatalogTableType.EXTERNAL + } else { + CatalogTableType.MANAGED + } + + CatalogTable( + identifier = table, + tableType = tableType, + storage = storage.copy(locationUri = customLocation), + schema = schema, + provider = Some(provider), + partitionColumnNames = partitionColumnNames, + bucketSpec = bucketSpec, + properties = properties, + comment = comment) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 4fc2ff6c36e68..4c699276135e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, QualifiedTableName, TableIdentifier} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, QualifiedTableName} import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog._ @@ -34,13 +34,11 @@ import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan, Project} -import org.apache.spark.sql.catalyst.plans.logical.sql import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ -import org.apache.spark.sql.sources.v2.TableProvider import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -134,26 +132,6 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast } override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case sql.CreateTable( - table, schema, partitionCols, bucketSpec, properties, V1Provider(provider), options, - location, comment, ifNotExists) => - - val tableDesc = buildCatalogTable(table, schema, partitionCols, bucketSpec, properties, - provider, options, location, comment, ifNotExists) - val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists - - CreateTable(tableDesc, mode, None) - - case sql.CreateTableAsSelect( - table, query, partitionCols, bucketSpec, properties, V1Provider(provider), options, - location, comment, ifNotExists) => - - val tableDesc = buildCatalogTable(table, query.schema, partitionCols, bucketSpec, properties, - provider, options, location, comment, ifNotExists) - val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists - - CreateTable(tableDesc, mode, Some(query)) - case CreateTable(tableDesc, mode, None) if DDLUtils.isDatasourceTable(tableDesc) => CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore) @@ -231,58 +209,6 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast Some(t.location), actualQuery.output.map(_.name)) } - - object V1Provider { - def unapply(provider: String): Option[String] = { - lazy val providerClass = DataSource.lookupDataSource(provider, conf) - provider match { - case "hive" => - None - case _ if classOf[TableProvider].isAssignableFrom(providerClass) => - None - case _ => - Some(provider) - } - } - } - - private def buildCatalogTable( - table: TableIdentifier, - schema: StructType, - partitionColumnNames: Seq[String], - bucketSpec: Option[BucketSpec], - properties: Map[String, String], - provider: String, - options: Map[String, String], - location: Option[String], - comment: Option[String], - ifNotExists: Boolean): CatalogTable = { - - val storage = DataSource.buildStorageFormatFromOptions(options) - if (location.isDefined && storage.locationUri.isDefined) { - throw new AnalysisException( - "LOCATION and 'path' in OPTIONS are both used to indicate the custom table path, " + - "you can only specify one of them.") - } - val customLocation = storage.locationUri.orElse(location.map(CatalogUtils.stringToURI)) - - val tableType = if (customLocation.isDefined) { - CatalogTableType.EXTERNAL - } else { - CatalogTableType.MANAGED - } - - CatalogTable( - identifier = table, - tableType = tableType, - storage = storage.copy(locationUri = customLocation), - schema = schema, - provider = Some(provider), - partitionColumnNames = partitionColumnNames, - bucketSpec = bucketSpec, - properties = properties, - comment = comment) - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index f05aa5113e03a..d5543e8a31aad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -161,6 +161,7 @@ abstract class BaseSessionStateBuilder( new FindDataSourceTable(session) +: new ResolveSQLOnFile(session) +: new FallbackOrcDataSourceV2(session) +: + DataSourceResolution(conf) +: customResolutionRules override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index 68f4b2ddbac0b..877a0dadf0b03 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -73,6 +73,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session new FindDataSourceTable(session) +: new ResolveSQLOnFile(session) +: new FallbackOrcDataSourceV2(session) +: + DataSourceResolution(conf) +: customResolutionRules override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = From e6760954b5bb0f8a17d91a62b1b2d9c9eb768d85 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Thu, 14 Mar 2019 09:31:36 -0700 Subject: [PATCH 04/12] Fix CreateTableAsSelect conversion to CatalogTable. --- .../sql/execution/datasources/DataSourceResolution.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index b48a625a69b8b..35a357efd1657 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -44,8 +44,8 @@ case class DataSourceResolution(conf: SQLConf) extends Rule[LogicalPlan] with Ca table, query, partitionCols, bucketSpec, properties, V1Provider(provider), options, location, comment, ifNotExists) => - val tableDesc = buildCatalogTable(table, query.schema, partitionCols, bucketSpec, properties, - provider, options, location, comment, ifNotExists) + val tableDesc = buildCatalogTable(table, new StructType, partitionCols, bucketSpec, + properties, provider, options, location, comment, ifNotExists) val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists CreateTable(tableDesc, mode, Some(query)) From 32551bef41281bd155389e410b0040369c44ca10 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 15 Mar 2019 13:37:54 -0700 Subject: [PATCH 05/12] Apply the v1 source writer list. --- .../datasources/DataSourceResolution.scala | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index 35a357efd1657..d77c482b6aff7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.types.StructType case class DataSourceResolution(conf: SQLConf) extends Rule[LogicalPlan] with CastSupport { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case sql.CreateTable( - table, schema, partitionCols, bucketSpec, properties, V1Provider(provider), options, + table, schema, partitionCols, bucketSpec, properties, V1WriteProvider(provider), options, location, comment, ifNotExists) => val tableDesc = buildCatalogTable(table, schema, partitionCols, bucketSpec, properties, @@ -41,7 +41,7 @@ case class DataSourceResolution(conf: SQLConf) extends Rule[LogicalPlan] with Ca CreateTable(tableDesc, mode, None) case sql.CreateTableAsSelect( - table, query, partitionCols, bucketSpec, properties, V1Provider(provider), options, + table, query, partitionCols, bucketSpec, properties, V1WriteProvider(provider), options, location, comment, ifNotExists) => val tableDesc = buildCatalogTable(table, new StructType, partitionCols, bucketSpec, @@ -51,16 +51,20 @@ case class DataSourceResolution(conf: SQLConf) extends Rule[LogicalPlan] with Ca CreateTable(tableDesc, mode, Some(query)) } - object V1Provider { + object V1WriteProvider { def unapply(provider: String): Option[String] = { - lazy val providerClass = DataSource.lookupDataSource(provider, conf) - provider match { - case "hive" => - None - case _ if classOf[TableProvider].isAssignableFrom(providerClass) => - None - case _ => - Some(provider) + if (conf.userV1SourceWriterList.contains(provider)) { + Some(provider) + } else { + lazy val providerClass = DataSource.lookupDataSource(provider, conf) + provider match { + case "hive" => + None + case _ if classOf[TableProvider].isAssignableFrom(providerClass) => + None + case _ => + Some(provider) + } } } } From 64c1f0111ed2ad6ee021abc8e7355e5b35db0bd1 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Sat, 16 Mar 2019 12:29:57 -0700 Subject: [PATCH 06/12] Fix V1WriteProvider extraction for Hive sources. --- .../spark/sql/execution/datasources/DataSourceResolution.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index d77c482b6aff7..3df2cca78b8ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -58,8 +58,6 @@ case class DataSourceResolution(conf: SQLConf) extends Rule[LogicalPlan] with Ca } else { lazy val providerClass = DataSource.lookupDataSource(provider, conf) provider match { - case "hive" => - None case _ if classOf[TableProvider].isAssignableFrom(providerClass) => None case _ => From f3fab1c3db99c37342f159cdf64546e0de23e88a Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Sat, 16 Mar 2019 16:59:47 -0700 Subject: [PATCH 07/12] Fix v1 override check to handle case sensitivity. --- .../sql/execution/datasources/DataSourceResolution.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index 3df2cca78b8ea..fe8789e2a8ae3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources +import java.util.Locale + import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.CastSupport @@ -52,8 +54,11 @@ case class DataSourceResolution(conf: SQLConf) extends Rule[LogicalPlan] with Ca } object V1WriteProvider { + private val v1WriteOverrideSet = + conf.userV1SourceWriterList.toLowerCase(Locale.ROOT).split(",").toSet + def unapply(provider: String): Option[String] = { - if (conf.userV1SourceWriterList.contains(provider)) { + if (v1WriteOverrideSet.contains(provider.toLowerCase(Locale.ROOT))) { Some(provider) } else { lazy val providerClass = DataSource.lookupDataSource(provider, conf) From 62ccd4d0bf279e5f01f187978c5b5d45c3ad0c03 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Mon, 18 Mar 2019 14:42:30 -0700 Subject: [PATCH 08/12] Fix DDLParserSuite. * Move CreateTable and CreateTableAsSelect tests to catalyst * Add PlanResolutionSuite to test parsing and resolution --- .../sql/catalyst/parser/DDLParserSuite.scala | 293 ++++++++++++++++++ .../execution/command/DDLParserSuite.scala | 251 +-------------- .../command/PlanResolutionSuite.scala | 257 +++++++++++++++ 3 files changed, 564 insertions(+), 237 deletions(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala new file mode 100644 index 0000000000000..18a70e2b9162b --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.parser + +import java.net.URI + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.AnalysisTest +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTable, CreateTableAsSelect} +import org.apache.spark.sql.types.{IntegerType, StringType, StructType} + +class DDLParserSuite extends AnalysisTest { + import CatalystSqlParser._ + + private def intercept(sqlCommand: String, messages: String*): Unit = { + val e = intercept[ParseException](parsePlan(sqlCommand)) + messages.foreach { message => + assert(e.message.contains(message)) + } + } + + test("create table - with IF NOT EXISTS") { + val sql = "CREATE TABLE IF NOT EXISTS my_tab(a INT, b STRING) USING parquet" + + parsePlan(sql) match { + case create: CreateTable => + assert(create.table == TableIdentifier("my_tab")) + assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType)) + assert(create.partitioning.isEmpty) + assert(create.bucketSpec.isEmpty) + assert(create.properties.isEmpty) + assert(create.provider == "parquet") + assert(create.options.isEmpty) + assert(create.location.isEmpty) + assert(create.comment.isEmpty) + assert(create.ifNotExists) + + case other => + fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + + s"got ${other.getClass.getName}: $sql") + } + } + + test("create table - with partitioned by") { + val query = "CREATE TABLE my_tab(a INT comment 'test', b STRING) " + + "USING parquet PARTITIONED BY (a)" + + parsePlan(query) match { + case create: CreateTable => + assert(create.table == TableIdentifier("my_tab")) + assert(create.tableSchema == new StructType() + .add("a", IntegerType, nullable = true, "test") + .add("b", StringType)) + assert(create.partitioning == Seq("a")) + assert(create.bucketSpec.isEmpty) + assert(create.properties.isEmpty) + assert(create.provider == "parquet") + assert(create.options.isEmpty) + assert(create.location.isEmpty) + assert(create.comment.isEmpty) + assert(!create.ifNotExists) + + case other => + fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + + s"got ${other.getClass.getName}: $query") + } + } + + test("create table - with bucket") { + val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " + + "CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS" + + parsePlan(query) match { + case create: CreateTable => + assert(create.table == TableIdentifier("my_tab")) + assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType)) + assert(create.partitioning.isEmpty) + assert(create.bucketSpec.contains(BucketSpec(5, Seq("a"), Seq("b")))) + assert(create.properties.isEmpty) + assert(create.provider == "parquet") + assert(create.options.isEmpty) + assert(create.location.isEmpty) + assert(create.comment.isEmpty) + assert(!create.ifNotExists) + + case other => + fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + + s"got ${other.getClass.getName}: $query") + } + } + + test("create table - with comment") { + val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 'abc'" + + parsePlan(sql) match { + case create: CreateTable => + assert(create.table == TableIdentifier("my_tab")) + assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType)) + assert(create.partitioning.isEmpty) + assert(create.bucketSpec.isEmpty) + assert(create.properties.isEmpty) + assert(create.provider == "parquet") + assert(create.options.isEmpty) + assert(create.location.isEmpty) + assert(create.comment.contains("abc")) + assert(!create.ifNotExists) + + case other => + fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + + s"got ${other.getClass.getName}: $sql") + } + } + + test("create table - with table properties") { + val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet TBLPROPERTIES('test' = 'test')" + + parsePlan(sql) match { + case create: CreateTable => + assert(create.table == TableIdentifier("my_tab")) + assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType)) + assert(create.partitioning.isEmpty) + assert(create.bucketSpec.isEmpty) + assert(create.properties == Map("test" -> "test")) + assert(create.provider == "parquet") + assert(create.options.isEmpty) + assert(create.location.isEmpty) + assert(create.comment.isEmpty) + assert(!create.ifNotExists) + + case other => + fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + + s"got ${other.getClass.getName}: $sql") + } + } + + test("create table - with location") { + val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet LOCATION '/tmp/file'" + + parsePlan(sql) match { + case create: CreateTable => + assert(create.table == TableIdentifier("my_tab")) + assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType)) + assert(create.partitioning.isEmpty) + assert(create.bucketSpec.isEmpty) + assert(create.properties.isEmpty) + assert(create.provider == "parquet") + assert(create.options.isEmpty) + assert(create.location.contains("/tmp/file")) + assert(create.comment.isEmpty) + assert(!create.ifNotExists) + + case other => + fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + + s"got ${other.getClass.getName}: $sql") + } + } + + test("create table - byte length literal table name") { + val sql = "CREATE TABLE 1m.2g(a INT) USING parquet" + + parsePlan(sql) match { + case create: CreateTable => + assert(create.table == TableIdentifier("2g", Some("1m"))) + assert(create.tableSchema == new StructType().add("a", IntegerType)) + assert(create.partitioning.isEmpty) + assert(create.bucketSpec.isEmpty) + assert(create.properties.isEmpty) + assert(create.provider == "parquet") + assert(create.options.isEmpty) + assert(create.location.isEmpty) + assert(create.comment.isEmpty) + assert(!create.ifNotExists) + + case other => + fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + + s"got ${other.getClass.getName}: $sql") + } + } + + test("Duplicate clauses - create table") { + def createTableHeader(duplicateClause: String): String = { + s"CREATE TABLE my_tab(a INT, b STRING) USING parquet $duplicateClause $duplicateClause" + } + + intercept(createTableHeader("TBLPROPERTIES('test' = 'test2')"), + "Found duplicate clauses: TBLPROPERTIES") + intercept(createTableHeader("LOCATION '/tmp/file'"), + "Found duplicate clauses: LOCATION") + intercept(createTableHeader("COMMENT 'a table'"), + "Found duplicate clauses: COMMENT") + intercept(createTableHeader("CLUSTERED BY(b) INTO 256 BUCKETS"), + "Found duplicate clauses: CLUSTERED BY") + intercept(createTableHeader("PARTITIONED BY (b)"), + "Found duplicate clauses: PARTITIONED BY") + } + + test("support for other types in OPTIONS") { + val sql = + """ + |CREATE TABLE table_name USING json + |OPTIONS (a 1, b 0.1, c TRUE) + """.stripMargin + + parsePlan(sql) match { + case create: CreateTable => + assert(create.table == TableIdentifier("table_name")) + assert(create.tableSchema == new StructType) + assert(create.partitioning.isEmpty) + assert(create.bucketSpec.isEmpty) + assert(create.properties.isEmpty) + assert(create.provider == "json") + assert(create.options == Map("a" -> "1", "b" -> "0.1", "c" -> "true")) + assert(create.location.isEmpty) + assert(create.comment.isEmpty) + assert(!create.ifNotExists) + + case other => + fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + + s"got ${other.getClass.getName}: $sql") + } + } + + test("Test CTAS against native tables") { + val s1 = + """ + |CREATE TABLE IF NOT EXISTS mydb.page_view + |USING parquet + |COMMENT 'This is the staging page view table' + |LOCATION '/user/external/page_view' + |TBLPROPERTIES ('p1'='v1', 'p2'='v2') + |AS SELECT * FROM src + """.stripMargin + + val s2 = + """ + |CREATE TABLE IF NOT EXISTS mydb.page_view + |USING parquet + |LOCATION '/user/external/page_view' + |COMMENT 'This is the staging page view table' + |TBLPROPERTIES ('p1'='v1', 'p2'='v2') + |AS SELECT * FROM src + """.stripMargin + + val s3 = + """ + |CREATE TABLE IF NOT EXISTS mydb.page_view + |USING parquet + |COMMENT 'This is the staging page view table' + |LOCATION '/user/external/page_view' + |TBLPROPERTIES ('p1'='v1', 'p2'='v2') + |AS SELECT * FROM src + """.stripMargin + + checkParsing(s1) + checkParsing(s2) + checkParsing(s3) + + def checkParsing(sql: String): Unit = { + parsePlan(sql) match { + case create: CreateTableAsSelect => + assert(create.table == TableIdentifier("page_view", Some("mydb"))) + assert(create.partitioning.isEmpty) + assert(create.bucketSpec.isEmpty) + assert(create.properties == Map("p1" -> "v1", "p2" -> "v2")) + assert(create.provider == "parquet") + assert(create.options.isEmpty) + assert(create.location.contains("/user/external/page_view")) + assert(create.comment.contains("This is the staging page view table")) + assert(create.ifNotExists) + + case other => + fail(s"Expected to parse ${classOf[CreateTableAsSelect].getClass.getName} from query," + + s"got ${other.getClass.getName}: $sql") + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index e0ccae15f1d05..d430eeb294e13 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -415,173 +415,28 @@ class DDLParserSuite extends PlanTest with SharedSQLContext { assert(ct.tableDesc.storage.locationUri == Some(new URI("/something/anything"))) } - test("create table - with partitioned by") { - val query = "CREATE TABLE my_tab(a INT comment 'test', b STRING) " + - "USING parquet PARTITIONED BY (a)" - - val expectedTableDesc = CatalogTable( - identifier = TableIdentifier("my_tab"), - tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat.empty, - schema = new StructType() - .add("a", IntegerType, nullable = true, "test") - .add("b", StringType), - provider = Some("parquet"), - partitionColumnNames = Seq("a") - ) - - parser.parsePlan(query) match { - case CreateTable(tableDesc, _, None) => - assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) - case other => - fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + - s"got ${other.getClass.getName}: $query") - } - } - - test("create table - with bucket") { - val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " + - "CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS" - - val expectedTableDesc = CatalogTable( - identifier = TableIdentifier("my_tab"), - tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat.empty, - schema = new StructType().add("a", IntegerType).add("b", StringType), - provider = Some("parquet"), - bucketSpec = Some(BucketSpec(5, Seq("a"), Seq("b"))) - ) - - parser.parsePlan(query) match { - case CreateTable(tableDesc, _, None) => - assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) - case other => - fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + - s"got ${other.getClass.getName}: $query") - } - } - - test("create table - with comment") { - val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 'abc'" - - val expectedTableDesc = CatalogTable( - identifier = TableIdentifier("my_tab"), - tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat.empty, - schema = new StructType().add("a", IntegerType).add("b", StringType), - provider = Some("parquet"), - comment = Some("abc")) - - parser.parsePlan(sql) match { - case CreateTable(tableDesc, _, None) => - assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) - case other => - fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + - s"got ${other.getClass.getName}: $sql") + test("Duplicate clauses - create hive table") { + def createTableHeader(duplicateClause: String): String = { + s"CREATE TABLE my_tab(a INT, b STRING) STORED AS parquet $duplicateClause $duplicateClause" } - } - - test("create table - with table properties") { - val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet TBLPROPERTIES('test' = 'test')" - val expectedTableDesc = CatalogTable( - identifier = TableIdentifier("my_tab"), - tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat.empty, - schema = new StructType().add("a", IntegerType).add("b", StringType), - provider = Some("parquet"), - properties = Map("test" -> "test")) - - parser.parsePlan(sql) match { - case CreateTable(tableDesc, _, None) => - assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) - case other => - fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + - s"got ${other.getClass.getName}: $sql") - } - } - - test("Duplicate clauses - create table") { - def createTableHeader(duplicateClause: String, isNative: Boolean): String = { - val fileFormat = if (isNative) "USING parquet" else "STORED AS parquet" - s"CREATE TABLE my_tab(a INT, b STRING) $fileFormat $duplicateClause $duplicateClause" - } - - Seq(true, false).foreach { isNative => - intercept(createTableHeader("TBLPROPERTIES('test' = 'test2')", isNative), - "Found duplicate clauses: TBLPROPERTIES") - intercept(createTableHeader("LOCATION '/tmp/file'", isNative), - "Found duplicate clauses: LOCATION") - intercept(createTableHeader("COMMENT 'a table'", isNative), - "Found duplicate clauses: COMMENT") - intercept(createTableHeader("CLUSTERED BY(b) INTO 256 BUCKETS", isNative), - "Found duplicate clauses: CLUSTERED BY") - } - - // Only for native data source tables - intercept(createTableHeader("PARTITIONED BY (b)", isNative = true), + intercept(createTableHeader("TBLPROPERTIES('test' = 'test2')"), + "Found duplicate clauses: TBLPROPERTIES") + intercept(createTableHeader("LOCATION '/tmp/file'"), + "Found duplicate clauses: LOCATION") + intercept(createTableHeader("COMMENT 'a table'"), + "Found duplicate clauses: COMMENT") + intercept(createTableHeader("CLUSTERED BY(b) INTO 256 BUCKETS"), + "Found duplicate clauses: CLUSTERED BY") + intercept(createTableHeader("PARTITIONED BY (k int)"), "Found duplicate clauses: PARTITIONED BY") - - // Only for Hive serde tables - intercept(createTableHeader("PARTITIONED BY (k int)", isNative = false), - "Found duplicate clauses: PARTITIONED BY") - intercept(createTableHeader("STORED AS parquet", isNative = false), + intercept(createTableHeader("STORED AS parquet"), "Found duplicate clauses: STORED AS/BY") intercept( - createTableHeader("ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'", isNative = false), + createTableHeader("ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'"), "Found duplicate clauses: ROW FORMAT") } - test("create table - with location") { - val v1 = "CREATE TABLE my_tab(a INT, b STRING) USING parquet LOCATION '/tmp/file'" - - val expectedTableDesc = CatalogTable( - identifier = TableIdentifier("my_tab"), - tableType = CatalogTableType.EXTERNAL, - storage = CatalogStorageFormat.empty.copy(locationUri = Some(new URI("/tmp/file"))), - schema = new StructType().add("a", IntegerType).add("b", StringType), - provider = Some("parquet")) - - parser.parsePlan(v1) match { - case CreateTable(tableDesc, _, None) => - assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) - case other => - fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + - s"got ${other.getClass.getName}: $v1") - } - - val v2 = - """ - |CREATE TABLE my_tab(a INT, b STRING) - |USING parquet - |OPTIONS (path '/tmp/file') - |LOCATION '/tmp/file' - """.stripMargin - val e = intercept[ParseException] { - parser.parsePlan(v2) - } - assert(e.message.contains("you can only specify one of them.")) - } - - test("create table - byte length literal table name") { - val sql = "CREATE TABLE 1m.2g(a INT) USING parquet" - - val expectedTableDesc = CatalogTable( - identifier = TableIdentifier("2g", Some("1m")), - tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat.empty, - schema = new StructType().add("a", IntegerType), - provider = Some("parquet")) - - parser.parsePlan(sql) match { - case CreateTable(tableDesc, _, None) => - assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) - case other => - fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + - s"got ${other.getClass.getName}: $sql") - } - } - test("insert overwrite directory") { val v1 = "INSERT OVERWRITE DIRECTORY '/tmp/file' USING parquet SELECT 1 as a" parser.parsePlan(v1) match { @@ -1165,84 +1020,6 @@ class DDLParserSuite extends PlanTest with SharedSQLContext { comparePlans(parsed, expected) } - test("support for other types in OPTIONS") { - val sql = - """ - |CREATE TABLE table_name USING json - |OPTIONS (a 1, b 0.1, c TRUE) - """.stripMargin - - val expectedTableDesc = CatalogTable( - identifier = TableIdentifier("table_name"), - tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat.empty.copy( - properties = Map("a" -> "1", "b" -> "0.1", "c" -> "true") - ), - schema = new StructType, - provider = Some("json") - ) - - parser.parsePlan(sql) match { - case CreateTable(tableDesc, _, None) => - assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) - case other => - fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + - s"got ${other.getClass.getName}: $sql") - } - } - - test("Test CTAS against data source tables") { - val s1 = - """ - |CREATE TABLE IF NOT EXISTS mydb.page_view - |USING parquet - |COMMENT 'This is the staging page view table' - |LOCATION '/user/external/page_view' - |TBLPROPERTIES ('p1'='v1', 'p2'='v2') - |AS SELECT * FROM src - """.stripMargin - - val s2 = - """ - |CREATE TABLE IF NOT EXISTS mydb.page_view - |USING parquet - |LOCATION '/user/external/page_view' - |COMMENT 'This is the staging page view table' - |TBLPROPERTIES ('p1'='v1', 'p2'='v2') - |AS SELECT * FROM src - """.stripMargin - - val s3 = - """ - |CREATE TABLE IF NOT EXISTS mydb.page_view - |USING parquet - |COMMENT 'This is the staging page view table' - |LOCATION '/user/external/page_view' - |TBLPROPERTIES ('p1'='v1', 'p2'='v2') - |AS SELECT * FROM src - """.stripMargin - - checkParsing(s1) - checkParsing(s2) - checkParsing(s3) - - def checkParsing(sql: String): Unit = { - val (desc, exists) = extractTableDesc(sql) - assert(exists) - assert(desc.identifier.database == Some("mydb")) - assert(desc.identifier.table == "page_view") - assert(desc.storage.locationUri == Some(new URI("/user/external/page_view"))) - assert(desc.schema.isEmpty) // will be populated later when the table is actually created - assert(desc.comment == Some("This is the staging page view table")) - assert(desc.viewText.isEmpty) - assert(desc.viewDefaultDatabase.isEmpty) - assert(desc.viewQueryColumnNames.isEmpty) - assert(desc.partitionColumnNames.isEmpty) - assert(desc.provider == Some("parquet")) - assert(desc.properties == Map("p1" -> "v1", "p2" -> "v2")) - } - } - test("Test CTAS #1") { val s1 = """ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala new file mode 100644 index 0000000000000..89c5df0900b61 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import java.net.URI + +import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.AnalysisTest +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceResolution} +import org.apache.spark.sql.types.{IntegerType, StringType, StructType} + +class PlanResolutionSuite extends AnalysisTest { + import CatalystSqlParser._ + + def parseAndResolve(query: String): LogicalPlan = { + DataSourceResolution(conf).apply(parsePlan(query)) + } + + private def extractTableDesc(sql: String): (CatalogTable, Boolean) = { + parseAndResolve(sql).collect { + case CreateTable(tableDesc, mode, _) => (tableDesc, mode == SaveMode.Ignore) + }.head + } + + test("create table - with partitioned by") { + val query = "CREATE TABLE my_tab(a INT comment 'test', b STRING) " + + "USING parquet PARTITIONED BY (a)" + + val expectedTableDesc = CatalogTable( + identifier = TableIdentifier("my_tab"), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty, + schema = new StructType() + .add("a", IntegerType, nullable = true, "test") + .add("b", StringType), + provider = Some("parquet"), + partitionColumnNames = Seq("a") + ) + + parseAndResolve(query) match { + case CreateTable(tableDesc, _, None) => + assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) + case other => + fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + + s"got ${other.getClass.getName}: $query") + } + } + + test("create table - with bucket") { + val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " + + "CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS" + + val expectedTableDesc = CatalogTable( + identifier = TableIdentifier("my_tab"), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty, + schema = new StructType().add("a", IntegerType).add("b", StringType), + provider = Some("parquet"), + bucketSpec = Some(BucketSpec(5, Seq("a"), Seq("b"))) + ) + + parseAndResolve(query) match { + case CreateTable(tableDesc, _, None) => + assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) + case other => + fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + + s"got ${other.getClass.getName}: $query") + } + } + + test("create table - with comment") { + val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 'abc'" + + val expectedTableDesc = CatalogTable( + identifier = TableIdentifier("my_tab"), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty, + schema = new StructType().add("a", IntegerType).add("b", StringType), + provider = Some("parquet"), + comment = Some("abc")) + + parseAndResolve(sql) match { + case CreateTable(tableDesc, _, None) => + assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) + case other => + fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + + s"got ${other.getClass.getName}: $sql") + } + } + + test("create table - with table properties") { + val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet TBLPROPERTIES('test' = 'test')" + + val expectedTableDesc = CatalogTable( + identifier = TableIdentifier("my_tab"), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty, + schema = new StructType().add("a", IntegerType).add("b", StringType), + provider = Some("parquet"), + properties = Map("test" -> "test")) + + parseAndResolve(sql) match { + case CreateTable(tableDesc, _, None) => + assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) + case other => + fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + + s"got ${other.getClass.getName}: $sql") + } + } + + test("create table - with location") { + val v1 = "CREATE TABLE my_tab(a INT, b STRING) USING parquet LOCATION '/tmp/file'" + + val expectedTableDesc = CatalogTable( + identifier = TableIdentifier("my_tab"), + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat.empty.copy(locationUri = Some(new URI("/tmp/file"))), + schema = new StructType().add("a", IntegerType).add("b", StringType), + provider = Some("parquet")) + + parseAndResolve(v1) match { + case CreateTable(tableDesc, _, None) => + assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) + case other => + fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + + s"got ${other.getClass.getName}: $v1") + } + + val v2 = + """ + |CREATE TABLE my_tab(a INT, b STRING) + |USING parquet + |OPTIONS (path '/tmp/file') + |LOCATION '/tmp/file' + """.stripMargin + val e = intercept[AnalysisException] { + parseAndResolve(v2) + } + assert(e.message.contains("you can only specify one of them.")) + } + + test("create table - byte length literal table name") { + val sql = "CREATE TABLE 1m.2g(a INT) USING parquet" + + val expectedTableDesc = CatalogTable( + identifier = TableIdentifier("2g", Some("1m")), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty, + schema = new StructType().add("a", IntegerType), + provider = Some("parquet")) + + parseAndResolve(sql) match { + case CreateTable(tableDesc, _, None) => + assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) + case other => + fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + + s"got ${other.getClass.getName}: $sql") + } + } + + test("support for other types in OPTIONS") { + val sql = + """ + |CREATE TABLE table_name USING json + |OPTIONS (a 1, b 0.1, c TRUE) + """.stripMargin + + val expectedTableDesc = CatalogTable( + identifier = TableIdentifier("table_name"), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty.copy( + properties = Map("a" -> "1", "b" -> "0.1", "c" -> "true") + ), + schema = new StructType, + provider = Some("json") + ) + + parseAndResolve(sql) match { + case CreateTable(tableDesc, _, None) => + assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) + case other => + fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + + s"got ${other.getClass.getName}: $sql") + } + } + + test("Test CTAS against data source tables") { + val s1 = + """ + |CREATE TABLE IF NOT EXISTS mydb.page_view + |USING parquet + |COMMENT 'This is the staging page view table' + |LOCATION '/user/external/page_view' + |TBLPROPERTIES ('p1'='v1', 'p2'='v2') + |AS SELECT * FROM src + """.stripMargin + + val s2 = + """ + |CREATE TABLE IF NOT EXISTS mydb.page_view + |USING parquet + |LOCATION '/user/external/page_view' + |COMMENT 'This is the staging page view table' + |TBLPROPERTIES ('p1'='v1', 'p2'='v2') + |AS SELECT * FROM src + """.stripMargin + + val s3 = + """ + |CREATE TABLE IF NOT EXISTS mydb.page_view + |USING parquet + |COMMENT 'This is the staging page view table' + |LOCATION '/user/external/page_view' + |TBLPROPERTIES ('p1'='v1', 'p2'='v2') + |AS SELECT * FROM src + """.stripMargin + + checkParsing(s1) + checkParsing(s2) + checkParsing(s3) + + def checkParsing(sql: String): Unit = { + val (desc, exists) = extractTableDesc(sql) + assert(exists) + assert(desc.identifier.database.contains("mydb")) + assert(desc.identifier.table == "page_view") + assert(desc.storage.locationUri.contains(new URI("/user/external/page_view"))) + assert(desc.schema.isEmpty) // will be populated later when the table is actually created + assert(desc.comment.contains("This is the staging page view table")) + assert(desc.viewText.isEmpty) + assert(desc.viewDefaultDatabase.isEmpty) + assert(desc.viewQueryColumnNames.isEmpty) + assert(desc.partitionColumnNames.isEmpty) + assert(desc.provider.contains("parquet")) + assert(desc.properties == Map("p1" -> "v1", "p2" -> "v2")) + } + } +} From e3444d8e5f557a461b37165310ac8e7141de6d8b Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Mon, 18 Mar 2019 15:02:31 -0700 Subject: [PATCH 09/12] Fix password redaction tests. --- .../sql/catalyst/plans/logical/sql/ParsedLogicalPlan.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedLogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedLogicalPlan.scala index 002497fc94a40..6ddb1fbbc729e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedLogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedLogicalPlan.scala @@ -34,5 +34,11 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan * kept in a [[org.apache.spark.sql.catalyst.parser.AbstractSqlParser]]. */ private[sql] abstract class ParsedLogicalPlan extends LogicalPlan { + // Redact properties and options when parsed nodes are used by generic methods like toString + override def productIterator: Iterator[Any] = super.productIterator.map { + case mapArg: Map[String, String] => conf.redactOptions(mapArg) + case other => other + } + final override lazy val resolved = false } From 0594a82ed61333ca0686d9ce44295fac24306e15 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Mon, 18 Mar 2019 15:08:27 -0700 Subject: [PATCH 10/12] Move another test into catalyst DDLParserSuite. --- .../sql/catalyst/parser/DDLParserSuite.scala | 31 +++++++++++++++++-- .../sql/execution/SparkSqlParserSuite.scala | 13 -------- 2 files changed, 28 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 18a70e2b9162b..3a833c4a44ace 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -17,11 +17,9 @@ package org.apache.spark.sql.catalyst.parser -import java.net.URI - import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.AnalysisTest -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTable, CreateTableAsSelect} import org.apache.spark.sql.types.{IntegerType, StringType, StructType} @@ -35,6 +33,33 @@ class DDLParserSuite extends AnalysisTest { } } + test("create table using - schema") { + val sql = "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet" + + parsePlan(sql) match { + case create: CreateTable => + assert(create.table == TableIdentifier("my_tab")) + assert(create.tableSchema == new StructType() + .add("a", IntegerType, nullable = true, "test") + .add("b", StringType)) + assert(create.partitioning.isEmpty) + assert(create.bucketSpec.isEmpty) + assert(create.properties.isEmpty) + assert(create.provider == "parquet") + assert(create.options.isEmpty) + assert(create.location.isEmpty) + assert(create.comment.isEmpty) + assert(!create.ifNotExists) + + case other => + fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + + s"got ${other.getClass.getName}: $sql") + } + + intercept("CREATE TABLE my_tab(a: INT COMMENT 'test', b: STRING) USING parquet", + "no viable alternative at input") + } + test("create table - with IF NOT EXISTS") { val sql = "CREATE TABLE IF NOT EXISTS my_tab(a INT, b STRING) USING parquet" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala index 425a96b871ad2..be3d0794d4036 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala @@ -215,19 +215,6 @@ class SparkSqlParserSuite extends AnalysisTest { "no viable alternative at input") } - test("create table using - schema") { - assertEqual("CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet", - createTableUsing( - table = "my_tab", - schema = (new StructType) - .add("a", IntegerType, nullable = true, "test") - .add("b", StringType) - ) - ) - intercept("CREATE TABLE my_tab(a: INT COMMENT 'test', b: STRING) USING parquet", - "no viable alternative at input") - } - test("create view as insert into table") { // Single insert query intercept("CREATE VIEW testView AS INSERT INTO jt VALUES(1, 1)", From e0eebd078cd3e396af1d877cf445fbd424e46bec Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Mon, 18 Mar 2019 15:30:30 -0700 Subject: [PATCH 11/12] Fix redaction matcher for type erasure. --- .../sql/catalyst/plans/logical/sql/ParsedLogicalPlan.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedLogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedLogicalPlan.scala index 6ddb1fbbc729e..7f008f4ca1c6f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedLogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedLogicalPlan.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan private[sql] abstract class ParsedLogicalPlan extends LogicalPlan { // Redact properties and options when parsed nodes are used by generic methods like toString override def productIterator: Iterator[Any] = super.productIterator.map { - case mapArg: Map[String, String] => conf.redactOptions(mapArg) + case mapArg: Map[_, _] => conf.redactOptions(mapArg.asInstanceOf[Map[String, String]]) case other => other } From 6c9b9dc05b815ce9130d5fad7452cda513f57648 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 22 Mar 2019 09:13:01 -0700 Subject: [PATCH 12/12] Add Statement suffix to new parsed plans. --- .../sql/catalyst/parser/AstBuilder.scala | 10 ++--- ...Table.scala => CreateTableStatement.scala} | 8 ++-- ...ogicalPlan.scala => ParsedStatement.scala} | 6 +-- .../sql/catalyst/parser/DDLParserSuite.scala | 44 +++++++++---------- .../datasources/DataSourceResolution.scala | 6 +-- 5 files changed, 37 insertions(+), 37 deletions(-) rename sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/{CreateTable.scala => CreateTableStatement.scala} (92%) rename sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/{ParsedLogicalPlan.scala => ParsedStatement.scala} (92%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 8756018cda933..52a5d2c66c5b7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last} import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTable, CreateTableAsSelect} +import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement} import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -2008,7 +2008,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } /** - * Create a table, returning a [[CreateTable]] logical plan. + * Create a table, returning a [[CreateTableStatement]] logical plan. * * Expected format: * {{{ @@ -2063,7 +2063,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging ctx) case Some(query) => - CreateTableAsSelect( + CreateTableAsSelectStatement( table, query, partitionCols, bucketSpec, properties, provider, options, location, comment, ifNotExists = ifNotExists) @@ -2073,8 +2073,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging operationNotAllowed("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx) case _ => - CreateTable(table, schema.getOrElse(new StructType), partitionCols, bucketSpec, properties, - provider, options, location, comment, ifNotExists = ifNotExists) + CreateTableStatement(table, schema.getOrElse(new StructType), partitionCols, bucketSpec, + properties, provider, options, location, comment, ifNotExists = ifNotExists) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTableStatement.scala similarity index 92% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTable.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTableStatement.scala index 3962910c38f4c..c734968e838db 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTableStatement.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.types.StructType * * This is a metadata-only command and is not used to write data to the created table. */ -case class CreateTable( +case class CreateTableStatement( table: TableIdentifier, tableSchema: StructType, partitioning: Seq[String], @@ -38,7 +38,7 @@ case class CreateTable( options: Map[String, String], location: Option[String], comment: Option[String], - ifNotExists: Boolean) extends ParsedLogicalPlan { + ifNotExists: Boolean) extends ParsedStatement { override def output: Seq[Attribute] = Seq.empty @@ -48,7 +48,7 @@ case class CreateTable( /** * A CREATE TABLE AS SELECT command, as parsed from SQL. */ -case class CreateTableAsSelect( +case class CreateTableAsSelectStatement( table: TableIdentifier, asSelect: LogicalPlan, partitioning: Seq[String], @@ -58,7 +58,7 @@ case class CreateTableAsSelect( options: Map[String, String], location: Option[String], comment: Option[String], - ifNotExists: Boolean) extends ParsedLogicalPlan { + ifNotExists: Boolean) extends ParsedStatement { override def output: Seq[Attribute] = Seq.empty diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedLogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedStatement.scala similarity index 92% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedLogicalPlan.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedStatement.scala index 7f008f4ca1c6f..510f2a1ba1e6d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedLogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedStatement.scala @@ -25,15 +25,15 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan * This is used to hold information parsed from SQL when there are multiple implementations of a * query or command. For example, CREATE TABLE may be implemented by different nodes for v1 and v2. * Instead of parsing directly to a v1 CreateTable that keeps metadata in CatalogTable, and then - * converting that v1 metadata to the v2 equivalent, the sql [[CreateTable]] plan is produced by - * the parser and converted once into both implementations. + * converting that v1 metadata to the v2 equivalent, the sql [[CreateTableStatement]] plan is + * produced by the parser and converted once into both implementations. * * Parsed logical plans are not resolved because they must be converted to concrete logical plans. * * Parsed logical plans are located in Catalyst so that as much SQL parsing logic as possible is be * kept in a [[org.apache.spark.sql.catalyst.parser.AbstractSqlParser]]. */ -private[sql] abstract class ParsedLogicalPlan extends LogicalPlan { +private[sql] abstract class ParsedStatement extends LogicalPlan { // Redact properties and options when parsed nodes are used by generic methods like toString override def productIterator: Iterator[Any] = super.productIterator.map { case mapArg: Map[_, _] => conf.redactOptions(mapArg.asInstanceOf[Map[String, String]]) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 3a833c4a44ace..dae8f582c7716 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.parser import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.AnalysisTest import org.apache.spark.sql.catalyst.catalog.BucketSpec -import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTable, CreateTableAsSelect} +import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement} import org.apache.spark.sql.types.{IntegerType, StringType, StructType} class DDLParserSuite extends AnalysisTest { @@ -37,7 +37,7 @@ class DDLParserSuite extends AnalysisTest { val sql = "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet" parsePlan(sql) match { - case create: CreateTable => + case create: CreateTableStatement => assert(create.table == TableIdentifier("my_tab")) assert(create.tableSchema == new StructType() .add("a", IntegerType, nullable = true, "test") @@ -52,7 +52,7 @@ class DDLParserSuite extends AnalysisTest { assert(!create.ifNotExists) case other => - fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + + fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," + s"got ${other.getClass.getName}: $sql") } @@ -64,7 +64,7 @@ class DDLParserSuite extends AnalysisTest { val sql = "CREATE TABLE IF NOT EXISTS my_tab(a INT, b STRING) USING parquet" parsePlan(sql) match { - case create: CreateTable => + case create: CreateTableStatement => assert(create.table == TableIdentifier("my_tab")) assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType)) assert(create.partitioning.isEmpty) @@ -77,7 +77,7 @@ class DDLParserSuite extends AnalysisTest { assert(create.ifNotExists) case other => - fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + + fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," + s"got ${other.getClass.getName}: $sql") } } @@ -87,7 +87,7 @@ class DDLParserSuite extends AnalysisTest { "USING parquet PARTITIONED BY (a)" parsePlan(query) match { - case create: CreateTable => + case create: CreateTableStatement => assert(create.table == TableIdentifier("my_tab")) assert(create.tableSchema == new StructType() .add("a", IntegerType, nullable = true, "test") @@ -102,7 +102,7 @@ class DDLParserSuite extends AnalysisTest { assert(!create.ifNotExists) case other => - fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + + fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," + s"got ${other.getClass.getName}: $query") } } @@ -112,7 +112,7 @@ class DDLParserSuite extends AnalysisTest { "CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS" parsePlan(query) match { - case create: CreateTable => + case create: CreateTableStatement => assert(create.table == TableIdentifier("my_tab")) assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType)) assert(create.partitioning.isEmpty) @@ -125,7 +125,7 @@ class DDLParserSuite extends AnalysisTest { assert(!create.ifNotExists) case other => - fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + + fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," + s"got ${other.getClass.getName}: $query") } } @@ -134,7 +134,7 @@ class DDLParserSuite extends AnalysisTest { val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 'abc'" parsePlan(sql) match { - case create: CreateTable => + case create: CreateTableStatement => assert(create.table == TableIdentifier("my_tab")) assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType)) assert(create.partitioning.isEmpty) @@ -147,7 +147,7 @@ class DDLParserSuite extends AnalysisTest { assert(!create.ifNotExists) case other => - fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + + fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," + s"got ${other.getClass.getName}: $sql") } } @@ -156,7 +156,7 @@ class DDLParserSuite extends AnalysisTest { val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet TBLPROPERTIES('test' = 'test')" parsePlan(sql) match { - case create: CreateTable => + case create: CreateTableStatement => assert(create.table == TableIdentifier("my_tab")) assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType)) assert(create.partitioning.isEmpty) @@ -169,7 +169,7 @@ class DDLParserSuite extends AnalysisTest { assert(!create.ifNotExists) case other => - fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + + fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," + s"got ${other.getClass.getName}: $sql") } } @@ -178,7 +178,7 @@ class DDLParserSuite extends AnalysisTest { val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet LOCATION '/tmp/file'" parsePlan(sql) match { - case create: CreateTable => + case create: CreateTableStatement => assert(create.table == TableIdentifier("my_tab")) assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType)) assert(create.partitioning.isEmpty) @@ -191,7 +191,7 @@ class DDLParserSuite extends AnalysisTest { assert(!create.ifNotExists) case other => - fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + + fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," + s"got ${other.getClass.getName}: $sql") } } @@ -200,7 +200,7 @@ class DDLParserSuite extends AnalysisTest { val sql = "CREATE TABLE 1m.2g(a INT) USING parquet" parsePlan(sql) match { - case create: CreateTable => + case create: CreateTableStatement => assert(create.table == TableIdentifier("2g", Some("1m"))) assert(create.tableSchema == new StructType().add("a", IntegerType)) assert(create.partitioning.isEmpty) @@ -213,7 +213,7 @@ class DDLParserSuite extends AnalysisTest { assert(!create.ifNotExists) case other => - fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + + fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," + s"got ${other.getClass.getName}: $sql") } } @@ -243,7 +243,7 @@ class DDLParserSuite extends AnalysisTest { """.stripMargin parsePlan(sql) match { - case create: CreateTable => + case create: CreateTableStatement => assert(create.table == TableIdentifier("table_name")) assert(create.tableSchema == new StructType) assert(create.partitioning.isEmpty) @@ -256,7 +256,7 @@ class DDLParserSuite extends AnalysisTest { assert(!create.ifNotExists) case other => - fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + + fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," + s"got ${other.getClass.getName}: $sql") } } @@ -298,7 +298,7 @@ class DDLParserSuite extends AnalysisTest { def checkParsing(sql: String): Unit = { parsePlan(sql) match { - case create: CreateTableAsSelect => + case create: CreateTableAsSelectStatement => assert(create.table == TableIdentifier("page_view", Some("mydb"))) assert(create.partitioning.isEmpty) assert(create.bucketSpec.isEmpty) @@ -310,8 +310,8 @@ class DDLParserSuite extends AnalysisTest { assert(create.ifNotExists) case other => - fail(s"Expected to parse ${classOf[CreateTableAsSelect].getClass.getName} from query," + - s"got ${other.getClass.getName}: $sql") + fail(s"Expected to parse ${classOf[CreateTableAsSelectStatement].getClass.getName} " + + s"from query, got ${other.getClass.getName}: $sql") } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index fe8789e2a8ae3..9fd44ea4e6379 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.CastSupport import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.plans.logical.sql +import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2.TableProvider @@ -32,7 +32,7 @@ import org.apache.spark.sql.types.StructType case class DataSourceResolution(conf: SQLConf) extends Rule[LogicalPlan] with CastSupport { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case sql.CreateTable( + case CreateTableStatement( table, schema, partitionCols, bucketSpec, properties, V1WriteProvider(provider), options, location, comment, ifNotExists) => @@ -42,7 +42,7 @@ case class DataSourceResolution(conf: SQLConf) extends Rule[LogicalPlan] with Ca CreateTable(tableDesc, mode, None) - case sql.CreateTableAsSelect( + case CreateTableAsSelectStatement( table, query, partitionCols, bucketSpec, properties, V1WriteProvider(provider), options, location, comment, ifNotExists) =>