From b13a8e27c38638b3b7487565fd2d1b2ed7d54a9e Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 8 May 2019 17:13:28 -0700 Subject: [PATCH 1/8] Implement v2 CreateTableAsSelect. --- .../spark/sql/catalyst/parser/SqlBase.g4 | 2 +- .../sql/catalyst/analysis/CheckAnalysis.scala | 17 ++ .../sql/catalyst/parser/AstBuilder.scala | 5 +- .../plans/logical/basicLogicalOperators.scala | 31 ++++ .../logical/sql/CreateTableStatement.scala | 5 +- .../apache/spark/sql/types/StructType.scala | 23 +++ ...eateTablePartitioningValidationSuite.scala | 153 ++++++++++++++++++ .../sql/catalyst/parser/DDLParserSuite.scala | 23 ++- .../spark/sql/execution/SparkSqlParser.scala | 28 +++- .../datasources/DataSourceResolution.scala | 66 +++++++- .../datasources/v2/DataSourceV2Strategy.scala | 9 +- .../v2/WriteToDataSourceV2Exec.scala | 58 +++++++ .../internal/BaseSessionStateBuilder.scala | 2 +- .../command/PlanResolutionSuite.scala | 97 ++++++++++- 14 files changed, 487 insertions(+), 32 deletions(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 7ed7152692cf2..a9e3099b90817 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -238,7 +238,7 @@ unsupportedHiveNativeCommands ; createTableHeader - : CREATE TEMPORARY? EXTERNAL? TABLE (IF NOT EXISTS)? tableIdentifier + : CREATE TEMPORARY? EXTERNAL? TABLE (IF NOT EXISTS)? multipartIdentifier ; bucketSpec diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 427f196344dfc..0682f8ff504c1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -33,6 +33,8 @@ import org.apache.spark.sql.types._ */ trait CheckAnalysis extends PredicateHelper { + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ + /** * Override to provide additional checks for correct analysis. * These rules will be evaluated after our built-in check rules. @@ -296,6 +298,21 @@ trait CheckAnalysis extends PredicateHelper { } } + case CreateTableAsSelect(_, _, partitioning, query, _, _, _) => + val references = partitioning.flatMap(_.references).toSet + val badReferences = references.map(_.fieldNames).flatMap { column => + query.schema.findNestedField(column).map(_.dataType) match { + case Some(_) => + None + case _ => + Some(s"${column.quoted} is missing or is in a map or array") + } + } + + if (badReferences.nonEmpty) { + failAnalysis(s"Invalid partitioning: ${badReferences.mkString(", ")}") + } + case _ => // Fallbacks to the following checks } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 68cd1bd2f0a7b..228273cd3044e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2019,7 +2019,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging /** * Type to keep track of a table header: (identifier, isTemporary, ifNotExists, isExternal). */ - type TableHeader = (TableIdentifier, Boolean, Boolean, Boolean) + type TableHeader = (Seq[String], Boolean, Boolean, Boolean) /** * Validate a create table statement and return the [[TableIdentifier]]. @@ -2031,7 +2031,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging if (temporary && ifNotExists) { operationNotAllowed("CREATE TEMPORARY TABLE ... IF NOT EXISTS", ctx) } - (visitTableIdentifier(ctx.tableIdentifier), temporary, ifNotExists, ctx.EXTERNAL != null) + val multipartIdentifier: Seq[String] = ctx.multipartIdentifier.parts.asScala.map(_.getText) + (multipartIdentifier, temporary, ifNotExists, ctx.EXTERNAL != null) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 60a254d5271db..97baa1843c575 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog} +import org.apache.spark.sql.catalog.v2.expressions.Transform import org.apache.spark.sql.catalyst.AliasIdentifier import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation} import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable} @@ -402,6 +404,35 @@ trait V2WriteCommand extends Command { } } +/** + * Create a new table from a select query with a v2 catalog. + */ +case class CreateTableAsSelect( + catalog: TableCatalog, + tableName: Identifier, + partitioning: Seq[Transform], + query: LogicalPlan, + properties: Map[String, String], + writeOptions: Map[String, String], + ignoreIfExists: Boolean) extends Command { + + override def children: Seq[LogicalPlan] = Seq(query) + + override lazy val resolved: Boolean = { + // the table schema is created from the query schema, so the only resolution needed is to check + // that the columns referenced by the table's partitioning exist in the query schema + val references = partitioning.flatMap(_.references).toSet + references.map(_.fieldNames).forall { column => + query.schema.findNestedField(column).map(_.dataType) match { + case Some(_) => + true + case _ => + false + } + } + } +} + /** * Append data to an existing table. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTableStatement.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTableStatement.scala index ed1b3e3778c7f..7a26e01cde830 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTableStatement.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTableStatement.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.catalyst.plans.logical.sql import org.apache.spark.sql.catalog.v2.expressions.Transform -import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -30,7 +29,7 @@ import org.apache.spark.sql.types.StructType * This is a metadata-only command and is not used to write data to the created table. */ case class CreateTableStatement( - table: TableIdentifier, + tableName: Seq[String], tableSchema: StructType, partitioning: Seq[Transform], bucketSpec: Option[BucketSpec], @@ -50,7 +49,7 @@ case class CreateTableStatement( * A CREATE TABLE AS SELECT command, as parsed from SQL. */ case class CreateTableAsSelectStatement( - table: TableIdentifier, + tableName: Seq[String], asSelect: LogicalPlan, partitioning: Seq[Transform], bucketSpec: Option[BucketSpec], diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index 73a86acc808d0..fe324a489432f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -307,6 +307,29 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru nameToIndex.get(name) } + /** + * Returns a field in this struct and its child structs. + * + * This does not support finding fields nested in maps or arrays. + */ + private[sql] def findNestedField(fieldNames: Seq[String]): Option[StructField] = { + fieldNames.headOption.flatMap(nameToField.get) match { + case Some(field) => + if (fieldNames.tail.isEmpty) { + Some(field) + } else { + field.dataType match { + case struct: StructType => + struct.findNestedField(fieldNames.tail) + case _ => + None + } + } + case _ => + None + } + } + protected[sql] def toAttributes: Seq[AttributeReference] = map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala new file mode 100644 index 0000000000000..1ce8852f71bc8 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog, TestTableCatalog} +import org.apache.spark.sql.catalog.v2.expressions.LogicalExpressions +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LeafNode} +import org.apache.spark.sql.types.{DoubleType, LongType, StringType, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class CreateTablePartitioningValidationSuite extends AnalysisTest { + import CreateTablePartitioningValidationSuite._ + + test("CreateTableAsSelect: fail missing top-level column") { + val plan = CreateTableAsSelect( + catalog, + Identifier.of(Array(), "table_name"), + LogicalExpressions.bucket(4, "does_not_exist") :: Nil, + TestRelation2, + Map.empty, + Map.empty, + ignoreIfExists = false) + + assert(!plan.resolved) + assertAnalysisError(plan, Seq( + "Invalid partitioning", + "does_not_exist is missing or is in a map or array")) + } + + test("CreateTableAsSelect: fail missing top-level column nested reference") { + val plan = CreateTableAsSelect( + catalog, + Identifier.of(Array(), "table_name"), + LogicalExpressions.bucket(4, "does_not_exist.z") :: Nil, + TestRelation2, + Map.empty, + Map.empty, + ignoreIfExists = false) + + assert(!plan.resolved) + assertAnalysisError(plan, Seq( + "Invalid partitioning", + "does_not_exist.z is missing or is in a map or array")) + } + + test("CreateTableAsSelect: fail missing nested column") { + val plan = CreateTableAsSelect( + catalog, + Identifier.of(Array(), "table_name"), + LogicalExpressions.bucket(4, "point.z") :: Nil, + TestRelation2, + Map.empty, + Map.empty, + ignoreIfExists = false) + + assert(!plan.resolved) + assertAnalysisError(plan, Seq( + "Invalid partitioning", + "point.z is missing or is in a map or array")) + } + + test("CreateTableAsSelect: fail with multiple errors") { + val plan = CreateTableAsSelect( + catalog, + Identifier.of(Array(), "table_name"), + LogicalExpressions.bucket(4, "does_not_exist", "point.z") :: Nil, + TestRelation2, + Map.empty, + Map.empty, + ignoreIfExists = false) + + assert(!plan.resolved) + assertAnalysisError(plan, Seq( + "Invalid partitioning", + "point.z is missing or is in a map or array", + "does_not_exist is missing or is in a map or array")) + } + + test("CreateTableAsSelect: success with top-level column") { + val plan = CreateTableAsSelect( + catalog, + Identifier.of(Array(), "table_name"), + LogicalExpressions.bucket(4, "id") :: Nil, + TestRelation2, + Map.empty, + Map.empty, + ignoreIfExists = false) + + assertAnalysisSuccess(plan) + } + + test("CreateTableAsSelect: success using nested column") { + val plan = CreateTableAsSelect( + catalog, + Identifier.of(Array(), "table_name"), + LogicalExpressions.bucket(4, "point.x") :: Nil, + TestRelation2, + Map.empty, + Map.empty, + ignoreIfExists = false) + + assertAnalysisSuccess(plan) + } + + test("CreateTableAsSelect: success using complex column") { + val plan = CreateTableAsSelect( + catalog, + Identifier.of(Array(), "table_name"), + LogicalExpressions.bucket(4, "point") :: Nil, + TestRelation2, + Map.empty, + Map.empty, + ignoreIfExists = false) + + assertAnalysisSuccess(plan) + } +} + +private object CreateTablePartitioningValidationSuite { + val catalog: TableCatalog = { + val cat = new TestTableCatalog() + cat.initialize("test", CaseInsensitiveStringMap.empty()) + cat + } + + val schema: StructType = new StructType() + .add("id", LongType) + .add("data", StringType) + .add("point", new StructType().add("x", DoubleType).add("y", DoubleType)) +} + +private case object TestRelation2 extends LeafNode with NamedRelation { + override def name: String = "source_relation" + override def output: Seq[AttributeReference] = + CreateTablePartitioningValidationSuite.schema.toAttributes +} + diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 98388a74cd29d..08baebbf140e6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.catalyst.parser import org.apache.spark.sql.catalog.v2.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, YearsTransform} -import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.AnalysisTest import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement} @@ -40,7 +39,7 @@ class DDLParserSuite extends AnalysisTest { parsePlan(sql) match { case create: CreateTableStatement => - assert(create.table == TableIdentifier("my_tab")) + assert(create.tableName == Seq("my_tab")) assert(create.tableSchema == new StructType() .add("a", IntegerType, nullable = true, "test") .add("b", StringType)) @@ -67,7 +66,7 @@ class DDLParserSuite extends AnalysisTest { parsePlan(sql) match { case create: CreateTableStatement => - assert(create.table == TableIdentifier("my_tab")) + assert(create.tableName == Seq("my_tab")) assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType)) assert(create.partitioning.isEmpty) assert(create.bucketSpec.isEmpty) @@ -90,7 +89,7 @@ class DDLParserSuite extends AnalysisTest { parsePlan(query) match { case create: CreateTableStatement => - assert(create.table == TableIdentifier("my_tab")) + assert(create.tableName == Seq("my_tab")) assert(create.tableSchema == new StructType() .add("a", IntegerType, nullable = true, "test") .add("b", StringType)) @@ -125,7 +124,7 @@ class DDLParserSuite extends AnalysisTest { parsePlan(sql) match { case create: CreateTableStatement => - assert(create.table == TableIdentifier("my_tab")) + assert(create.tableName == Seq("my_tab")) assert(create.tableSchema == new StructType() .add("a", IntegerType) .add("b", StringType) @@ -161,7 +160,7 @@ class DDLParserSuite extends AnalysisTest { parsePlan(query) match { case create: CreateTableStatement => - assert(create.table == TableIdentifier("my_tab")) + assert(create.tableName == Seq("my_tab")) assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType)) assert(create.partitioning.isEmpty) assert(create.bucketSpec.contains(BucketSpec(5, Seq("a"), Seq("b")))) @@ -183,7 +182,7 @@ class DDLParserSuite extends AnalysisTest { parsePlan(sql) match { case create: CreateTableStatement => - assert(create.table == TableIdentifier("my_tab")) + assert(create.tableName == Seq("my_tab")) assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType)) assert(create.partitioning.isEmpty) assert(create.bucketSpec.isEmpty) @@ -205,7 +204,7 @@ class DDLParserSuite extends AnalysisTest { parsePlan(sql) match { case create: CreateTableStatement => - assert(create.table == TableIdentifier("my_tab")) + assert(create.tableName == Seq("my_tab")) assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType)) assert(create.partitioning.isEmpty) assert(create.bucketSpec.isEmpty) @@ -227,7 +226,7 @@ class DDLParserSuite extends AnalysisTest { parsePlan(sql) match { case create: CreateTableStatement => - assert(create.table == TableIdentifier("my_tab")) + assert(create.tableName == Seq("my_tab")) assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType)) assert(create.partitioning.isEmpty) assert(create.bucketSpec.isEmpty) @@ -249,7 +248,7 @@ class DDLParserSuite extends AnalysisTest { parsePlan(sql) match { case create: CreateTableStatement => - assert(create.table == TableIdentifier("2g", Some("1m"))) + assert(create.tableName == Seq("1m", "2g")) assert(create.tableSchema == new StructType().add("a", IntegerType)) assert(create.partitioning.isEmpty) assert(create.bucketSpec.isEmpty) @@ -292,7 +291,7 @@ class DDLParserSuite extends AnalysisTest { parsePlan(sql) match { case create: CreateTableStatement => - assert(create.table == TableIdentifier("table_name")) + assert(create.tableName == Seq("table_name")) assert(create.tableSchema == new StructType) assert(create.partitioning.isEmpty) assert(create.bucketSpec.isEmpty) @@ -347,7 +346,7 @@ class DDLParserSuite extends AnalysisTest { def checkParsing(sql: String): Unit = { parsePlan(sql) match { case create: CreateTableAsSelectStatement => - assert(create.table == TableIdentifier("page_view", Some("mydb"))) + assert(create.tableName == Seq("mydb", "page_view")) assert(create.partitioning.isEmpty) assert(create.bucketSpec.isEmpty) assert(create.properties == Map("p1" -> "v1", "p2" -> "v2")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 96af8b34b8bc9..d9e351523c3e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -25,7 +25,7 @@ import org.antlr.v4.runtime.{ParserRuleContext, Token} import org.antlr.v4.runtime.tree.TerminalNode import org.apache.spark.sql.SaveMode -import org.apache.spark.sql.catalyst.FunctionIdentifier +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser._ @@ -366,6 +366,25 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { DescribeQueryCommand(source(ctx.query), visitQuery(ctx.query)) } + /** + * Converts a multi-part identifier to a TableIdentifier. + * + * If the multi-part identifier has too many parts, this will throw a ParseException. + */ + def tableIdentifier( + multipart: Seq[String], + command: String, + ctx: ParserRuleContext): TableIdentifier = { + multipart match { + case Seq(tableName) => + TableIdentifier(tableName) + case Seq(database, tableName) => + TableIdentifier(tableName, Some(database)) + case _ => + operationNotAllowed(s"$command does not support multi-part identifiers", ctx) + } + } + /** * Create a table, returning a [[CreateTable]] logical plan. * @@ -376,7 +395,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { * it is deprecated. */ override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) { - val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader) + val (ident, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader) if (!temp || ctx.query != null) { super.visitCreateTable(ctx) @@ -405,6 +424,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { logWarning(s"CREATE TEMPORARY TABLE ... USING ... is deprecated, please use " + "CREATE TEMPORARY VIEW ... USING ... instead") + val table = tableIdentifier(ident, "CREATE TEMPORARY VIEW", ctx) CreateTempViewUsing(table, schema, replace = false, global = false, provider, options) } } @@ -938,7 +958,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { * }}} */ override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = withOrigin(ctx) { - val (name, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader) + val (ident, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader) // TODO: implement temporary tables if (temp) { throw new ParseException( @@ -996,6 +1016,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { CatalogTableType.MANAGED } + val name = tableIdentifier(ident, "CREATE TABLE ... STORED AS ...", ctx) + // TODO support the sql text - have a proper location for this! val tableDesc = CatalogTable( identifier = name, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index 03d0c303dcc3d..0b22344679d5f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -19,25 +19,34 @@ package org.apache.spark.sql.execution.datasources import java.util.Locale +import scala.collection.mutable + import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, LookupCatalog, TableCatalog} import org.apache.spark.sql.catalog.v2.expressions.Transform import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.CastSupport import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan} import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2.TableProvider import org.apache.spark.sql.types.StructType -case class DataSourceResolution(conf: SQLConf) extends Rule[LogicalPlan] with CastSupport { - import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.TransformHelper +case class DataSourceResolution( + conf: SQLConf, + findCatalog: String => CatalogPlugin) + extends Rule[LogicalPlan] with CastSupport with LookupCatalog { + + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ + + override def lookupCatalog: Option[String => CatalogPlugin] = Some(findCatalog) override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case CreateTableStatement( - table, schema, partitionCols, bucketSpec, properties, V1WriteProvider(provider), options, - location, comment, ifNotExists) => + AsTableIdentifier(table), schema, partitionCols, bucketSpec, properties, + V1WriteProvider(provider), options, location, comment, ifNotExists) => val tableDesc = buildCatalogTable(table, schema, partitionCols, bucketSpec, properties, provider, options, location, comment, ifNotExists) @@ -46,14 +55,20 @@ case class DataSourceResolution(conf: SQLConf) extends Rule[LogicalPlan] with Ca CreateTable(tableDesc, mode, None) case CreateTableAsSelectStatement( - table, query, partitionCols, bucketSpec, properties, V1WriteProvider(provider), options, - location, comment, ifNotExists) => + AsTableIdentifier(table), query, partitionCols, bucketSpec, properties, + V1WriteProvider(provider), options, location, comment, ifNotExists) => val tableDesc = buildCatalogTable(table, new StructType, partitionCols, bucketSpec, properties, provider, options, location, comment, ifNotExists) val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists CreateTable(tableDesc, mode, Some(query)) + + case create: CreateTableAsSelectStatement => + // the provider was not a v1 source, convert to a v2 plan + val CatalogObjectIdentifier(maybeCatalog, identifier) = create.tableName + val catalog = maybeCatalog.getOrElse(findCatalog.apply("default")).asTableCatalog + convertCTAS(catalog, identifier, create) } object V1WriteProvider { @@ -112,4 +127,41 @@ case class DataSourceResolution(conf: SQLConf) extends Rule[LogicalPlan] with Ca properties = properties, comment = comment) } + + private def convertCTAS( + catalog: TableCatalog, + identifier: Identifier, + ctas: CreateTableAsSelectStatement): CreateTableAsSelect = { + if (ctas.options.contains("path") && ctas.location.isDefined) { + throw new AnalysisException( + "LOCATION and 'path' in OPTIONS are both used to indicate the custom table path, " + + "you can only specify one of them.") + } + + val options = ctas.options.filterKeys(_ != "path") + + // convert the bucket spec and add it as a transform + val partitioning = ctas.partitioning ++ ctas.bucketSpec.map(_.asTransform) + + // create table properties from TBLPROPERTIES and OPTIONS clauses + val properties = new mutable.HashMap[String, String]() + properties ++= ctas.properties + properties ++= options + + // convert USING, LOCATION, and COMMENT clauses to table properties + properties += ("provider" -> ctas.provider) + ctas.comment.map(text => properties += ("comment" -> text)) + ctas.location + .orElse(ctas.options.get("path")) + .map(loc => properties += ("location" -> loc)) + + CreateTableAsSelect( + catalog, + identifier, + partitioning, + ctas.asSelect, + properties.toMap, + writeOptions = options, + ignoreIfExists = ctas.ifNotExists) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 7681dc8dfb37a..4325aaa4e993a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -17,18 +17,20 @@ package org.apache.spark.sql.execution.datasources.v2 +import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.spark.sql.{AnalysisException, Strategy} import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition} import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec} import org.apache.spark.sql.sources import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream} +import org.apache.spark.sql.util.CaseInsensitiveStringMap object DataSourceV2Strategy extends Strategy with PredicateHelper { @@ -150,6 +152,11 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { case WriteToDataSourceV2(writer, query) => WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil + case CreateTableAsSelect(catalog, ident, parts, query, props, options, ifNotExists) => + val writeOptions = new CaseInsensitiveStringMap(options.asJava) + CreateTableAsSelectExec( + catalog, ident, parts, planLater(query), props, writeOptions, ifNotExists) :: Nil + case AppendData(r: DataSourceV2Relation, query, _) => AppendDataExec(r.table.asWritable, r.options, planLater(query)) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 607f2fa0f82c8..1797166bbe0b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2 import java.util.UUID +import scala.collection.JavaConverters._ import scala.util.control.NonFatal import org.apache.spark.{SparkEnv, SparkException, TaskContext} @@ -26,7 +27,10 @@ import org.apache.spark.executor.CommitDeniedException import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog} +import org.apache.spark.sql.catalog.v2.expressions.Transform import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} @@ -47,6 +51,60 @@ case class WriteToDataSourceV2(batchWrite: BatchWrite, query: LogicalPlan) override def output: Seq[Attribute] = Nil } +/** + * Physical plan node for v2 create table as select. + * + * A new table will be created using the schema of the query, and rows from the query are appended. + * If either table creation or the append fails, the table will be deleted. This implementation does + * not provide an atomic CTAS. + */ +case class CreateTableAsSelectExec( + catalog: TableCatalog, + ident: Identifier, + partitioning: Seq[Transform], + query: SparkPlan, + properties: Map[String, String], + writeOptions: CaseInsensitiveStringMap, + ifNotExists: Boolean) extends V2TableWriteExec { + + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.IdentifierHelper + + override protected def doExecute(): RDD[InternalRow] = { + if (catalog.tableExists(ident)) { + if (ifNotExists) { + return sparkContext.parallelize(Seq.empty, 1) + } + + throw new TableAlreadyExistsException(ident) + } + + Utils.tryWithSafeFinallyAndFailureCallbacks({ + catalog.createTable(ident, query.schema, partitioning.toArray, properties.asJava) match { + case table: SupportsWrite => + val builder = table.newWriteBuilder(writeOptions) + .withInputDataSchema(query.schema) + .withQueryId(UUID.randomUUID().toString) + val batchWrite = builder match { + case supportsSaveMode: SupportsSaveMode => + supportsSaveMode.mode(SaveMode.Append).buildForBatch() + + case _ => + builder.buildForBatch() + } + + doWrite(batchWrite) + + case _ => + // table does not support writes + throw new SparkException(s"Table implementation does not support writes: ${ident.quoted}") + } + + })(catchBlock = { + catalog.dropTable(ident) + }) + } +} + /** * Physical plan node for append into a v2 table. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index b0127535559cf..b2d065274b151 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -169,7 +169,7 @@ abstract class BaseSessionStateBuilder( new FindDataSourceTable(session) +: new ResolveSQLOnFile(session) +: new FallBackFileSourceV2(session) +: - DataSourceResolution(conf) +: + DataSourceResolution(conf, session.catalog(_)) +: customResolutionRules override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 7fae54bb95ed1..41cb6440e8e5f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -20,19 +20,45 @@ package org.apache.spark.sql.execution.command import java.net.URI import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, CatalogPlugin, Identifier, TableCatalog, TestTableCatalog} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.AnalysisTest import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan} import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceResolution} +import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2 import org.apache.spark.sql.types.{IntegerType, StringType, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap class PlanResolutionSuite extends AnalysisTest { import CatalystSqlParser._ + private val orc2 = classOf[OrcDataSourceV2].getName + + private val defaultCatalog: TableCatalog = { + val newCatalog = new TestTableCatalog + newCatalog.initialize("default", CaseInsensitiveStringMap.empty()) + newCatalog + } + + private val testCat: TableCatalog = { + val newCatalog = new TestTableCatalog + newCatalog.initialize("testcat", CaseInsensitiveStringMap.empty()) + newCatalog + } + + private val lookupCatalog: String => CatalogPlugin = { + case "default" => + defaultCatalog + case "testcat" => + testCat + case name => + throw new CatalogNotFoundException(s"No such catalog: $name") + } + def parseAndResolve(query: String): LogicalPlan = { - DataSourceResolution(conf).apply(parsePlan(query)) + DataSourceResolution(conf, lookupCatalog).apply(parsePlan(query)) } private def extractTableDesc(sql: String): (CatalogTable, Boolean) = { @@ -274,4 +300,71 @@ class PlanResolutionSuite extends AnalysisTest { assert(desc.properties == Map("p1" -> "v1", "p2" -> "v2")) } } + + test("Test v2 CTAS with known catalog in identifier") { + val sql = + s""" + |CREATE TABLE IF NOT EXISTS testcat.mydb.table_name + |USING parquet + |COMMENT 'table comment' + |TBLPROPERTIES ('p1'='v1', 'p2'='v2') + |OPTIONS (path 's3://bucket/path/to/data', other 20) + |AS SELECT * FROM src + """.stripMargin + + val expectedProperties = Map( + "p1" -> "v1", + "p2" -> "v2", + "other" -> "20", + "provider" -> "parquet", + "location" -> "s3://bucket/path/to/data", + "comment" -> "table comment") + + parseAndResolve(sql) match { + case ctas: CreateTableAsSelect => + assert(ctas.catalog.name == "testcat") + assert(ctas.tableName == Identifier.of(Array("mydb"), "table_name")) + assert(ctas.properties == expectedProperties) + assert(ctas.writeOptions == Map("other" -> "20")) + assert(ctas.partitioning.isEmpty) + assert(ctas.ignoreIfExists) + + case other => + fail(s"Expected to parse ${classOf[CreateTableAsSelect].getName} from query," + + s"got ${other.getClass.getName}: $sql") + } + } + + test("Test v2 CTAS with data source v2 provider") { + val sql = + s""" + |CREATE TABLE IF NOT EXISTS mydb.page_view + |USING $orc2 + |COMMENT 'This is the staging page view table' + |LOCATION '/user/external/page_view' + |TBLPROPERTIES ('p1'='v1', 'p2'='v2') + |AS SELECT * FROM src + """.stripMargin + + val expectedProperties = Map( + "p1" -> "v1", + "p2" -> "v2", + "provider" -> orc2, + "location" -> "/user/external/page_view", + "comment" -> "This is the staging page view table") + + parseAndResolve(sql) match { + case ctas: CreateTableAsSelect => + assert(ctas.catalog.name == "default") + assert(ctas.tableName == Identifier.of(Array("mydb"), "page_view")) + assert(ctas.properties == expectedProperties) + assert(ctas.writeOptions.isEmpty) + assert(ctas.partitioning.isEmpty) + assert(ctas.ignoreIfExists) + + case other => + fail(s"Expected to parse ${classOf[CreateTableAsSelect].getName} from query," + + s"got ${other.getClass.getName}: $sql") + } + } } From a22c33578a0637fb093f6eeb4c00e8e09b7220b2 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Thu, 9 May 2019 16:02:16 -0700 Subject: [PATCH 2/8] Add v2 SQL test suite. --- .../apache/spark/sql/internal/SQLConf.scala | 7 + .../sql/catalog/v2/TestTableCatalog.scala | 2 +- .../datasources/DataSourceResolution.scala | 6 +- .../command/PlanResolutionSuite.scala | 14 +- .../sources/v2/TestInMemoryTableCatalog.scala | 227 ++++++++++++++++++ 5 files changed, 244 insertions(+), 12 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 7f577f015973d..8b9f3c53f8311 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1760,6 +1760,11 @@ object SQLConf { .internal() .intConf .createWithDefault(Int.MaxValue) + + val DEFAULT_V2_CATALOG = buildConf("spark.sql.default.catalog") + .doc("Name of the default v2 catalog, used when an catalog is not identified in queries") + .stringConf + .createOptional } /** @@ -2211,6 +2216,8 @@ class SQLConf extends Serializable with Logging { def setCommandRejectsSparkCoreConfs: Boolean = getConf(SQLConf.SET_COMMAND_REJECTS_SPARK_CORE_CONFS) + def defaultV2Catalog: Option[String] = getConf(DEFAULT_V2_CATALOG) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala index 7a0b014a85462..78b4763484cc0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala @@ -91,7 +91,7 @@ class TestTableCatalog extends TableCatalog { override def dropTable(ident: Identifier): Boolean = Option(tables.remove(ident)).isDefined } -private object TestTableCatalog { +object TestTableCatalog { /** * Apply properties changes to a map and return the result. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index 0b22344679d5f..163452cda8d67 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -41,6 +41,8 @@ case class DataSourceResolution( import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ + private def defaultCatalog: Option[CatalogPlugin] = conf.defaultV2Catalog.map(findCatalog) + override def lookupCatalog: Option[String => CatalogPlugin] = Some(findCatalog) override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { @@ -67,7 +69,9 @@ case class DataSourceResolution( case create: CreateTableAsSelectStatement => // the provider was not a v1 source, convert to a v2 plan val CatalogObjectIdentifier(maybeCatalog, identifier) = create.tableName - val catalog = maybeCatalog.getOrElse(findCatalog.apply("default")).asTableCatalog + val catalog = maybeCatalog.orElse(defaultCatalog) + .getOrElse(throw new AnalysisException("Default catalog is not set")) + .asTableCatalog convertCTAS(catalog, identifier, create) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 41cb6440e8e5f..f8119fd862d65 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -36,12 +36,6 @@ class PlanResolutionSuite extends AnalysisTest { private val orc2 = classOf[OrcDataSourceV2].getName - private val defaultCatalog: TableCatalog = { - val newCatalog = new TestTableCatalog - newCatalog.initialize("default", CaseInsensitiveStringMap.empty()) - newCatalog - } - private val testCat: TableCatalog = { val newCatalog = new TestTableCatalog newCatalog.initialize("testcat", CaseInsensitiveStringMap.empty()) @@ -49,8 +43,6 @@ class PlanResolutionSuite extends AnalysisTest { } private val lookupCatalog: String => CatalogPlugin = { - case "default" => - defaultCatalog case "testcat" => testCat case name => @@ -58,7 +50,9 @@ class PlanResolutionSuite extends AnalysisTest { } def parseAndResolve(query: String): LogicalPlan = { - DataSourceResolution(conf, lookupCatalog).apply(parsePlan(query)) + val newConf = conf.copy() + newConf.setConfString("spark.sql.default.catalog", "testcat") + DataSourceResolution(newConf, lookupCatalog).apply(parsePlan(query)) } private def extractTableDesc(sql: String): (CatalogTable, Boolean) = { @@ -355,7 +349,7 @@ class PlanResolutionSuite extends AnalysisTest { parseAndResolve(sql) match { case ctas: CreateTableAsSelect => - assert(ctas.catalog.name == "default") + assert(ctas.catalog.name == "testcat") assert(ctas.tableName == Identifier.of(Array("mydb"), "page_view")) assert(ctas.properties == expectedProperties) assert(ctas.writeOptions.isEmpty) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala new file mode 100644 index 0000000000000..e5b0edcb32022 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2 + +import java.util +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.catalog.v2.{CatalogV2Implicits, Identifier, TableCatalog, TableChange, TestTableCatalog} +import org.apache.spark.sql.catalog.v2.expressions.Transform +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException} +import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder} +import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriter, DataWriterFactory, SupportsTruncate, WriteBuilder, WriterCommitMessage} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +// this is currently in the spark-sql module because the read and write API is not in catalyst +// TODO(rdblue): when the v2 source API is in catalyst, merge with TestTableCatalog/InMemoryTable +class TestInMemoryTableCatalog extends TableCatalog { + import CatalogV2Implicits._ + + private val tables: util.Map[Identifier, InMemoryTable] = + new ConcurrentHashMap[Identifier, InMemoryTable]() + private var _name: Option[String] = None + + override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = { + _name = Some(name) + } + + override def name: String = _name.get + + override def listTables(namespace: Array[String]): Array[Identifier] = { + tables.keySet.asScala.filter(_.namespace.sameElements(namespace)).toArray + } + + override def loadTable(ident: Identifier): Table = { + Option(tables.get(ident)) match { + case Some(table) => + table + case _ => + throw new NoSuchTableException(ident) + } + } + + override def createTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): Table = { + + if (tables.containsKey(ident)) { + throw new TableAlreadyExistsException(ident) + } + + if (partitions.nonEmpty) { + throw new UnsupportedOperationException( + s"Catalog $name: Partitioned tables are not supported") + } + + val table = new InMemoryTable(s"$name.${ident.quoted}", schema, properties) + + tables.put(ident, table) + + table + } + + override def alterTable(ident: Identifier, changes: TableChange*): Table = { + Option(tables.get(ident)) match { + case Some(table) => + val properties = TestTableCatalog.applyPropertiesChanges(table.properties, changes) + val schema = TestTableCatalog.applySchemaChanges(table.schema, changes) + val newTable = new InMemoryTable(table.name, schema, properties, table.data) + + tables.put(ident, newTable) + + newTable + case _ => + throw new NoSuchTableException(ident) + } + } + + override def dropTable(ident: Identifier): Boolean = Option(tables.remove(ident)).isDefined + + def clearTables(): Unit = { + tables.clear() + } +} + +/** + * A simple in-memory table. Rows are stored as a buffered group produced by each output task. + */ +private class InMemoryTable( + val name: String, + val schema: StructType, + override val properties: util.Map[String, String]) + extends Table with SupportsRead with SupportsWrite { + + def this( + name: String, + schema: StructType, + properties: util.Map[String, String], + data: Array[BufferedRows]) = { + this(name, schema, properties) + replaceData(data) + } + + @volatile var data: Array[BufferedRows] = Array.empty + + def replaceData(buffers: Array[BufferedRows]): Unit = synchronized { + data = buffers + } + + override def capabilities: util.Set[TableCapability] = Set( + TableCapability.BATCH_READ, TableCapability.BATCH_WRITE, TableCapability.TRUNCATE).asJava + + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + () => new InMemoryBatchScan(data.map(_.asInstanceOf[InputPartition])) + } + + class InMemoryBatchScan(data: Array[InputPartition]) extends Scan with Batch { + override def readSchema(): StructType = schema + + override def toBatch: Batch = this + + override def planInputPartitions(): Array[InputPartition] = data + + override def createReaderFactory(): PartitionReaderFactory = BufferedRowsReaderFactory + } + + override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = { + new WriteBuilder with SupportsTruncate { + private var shouldTruncate: Boolean = false + + override def truncate(): WriteBuilder = { + shouldTruncate = true + this + } + + override def buildForBatch(): BatchWrite = { + if (shouldTruncate) TruncateAndAppend else Append + } + } + } + + private object TruncateAndAppend extends BatchWrite { + override def createBatchWriterFactory(): DataWriterFactory = { + BufferedRowsWriterFactory + } + + override def commit(messages: Array[WriterCommitMessage]): Unit = { + replaceData(messages.map(_.asInstanceOf[BufferedRows])) + } + + override def abort(messages: Array[WriterCommitMessage]): Unit = { + } + } + + private object Append extends BatchWrite { + override def createBatchWriterFactory(): DataWriterFactory = { + BufferedRowsWriterFactory + } + + override def commit(messages: Array[WriterCommitMessage]): Unit = { + replaceData(data ++ messages.map(_.asInstanceOf[BufferedRows])) + } + + override def abort(messages: Array[WriterCommitMessage]): Unit = { + } + } +} + +private class BufferedRows extends WriterCommitMessage with InputPartition with Serializable { + val rows = new mutable.ArrayBuffer[InternalRow]() +} + +private object BufferedRowsReaderFactory extends PartitionReaderFactory { + override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { + new BufferedRowsReader(partition.asInstanceOf[BufferedRows]) + } +} + +private class BufferedRowsReader(partition: BufferedRows) extends PartitionReader[InternalRow] { + private var index: Int = -1 + + override def next(): Boolean = { + index += 1 + index < partition.rows.length + } + + override def get(): InternalRow = partition.rows(index) + + override def close(): Unit = {} +} + +private object BufferedRowsWriterFactory extends DataWriterFactory { + override def createWriter(partitionId: Int, taskId: Long): DataWriter[InternalRow] = { + new BufferWriter + } +} + +private class BufferWriter extends DataWriter[InternalRow] { + private val buffer = new BufferedRows + + override def write(row: InternalRow): Unit = buffer.rows.append(row.copy()) + + override def commit(): WriterCommitMessage = buffer + + override def abort(): Unit = {} +} From cdf38051fd4ab85277c7e285acf92ed8ba3b8c1b Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Thu, 9 May 2019 16:03:53 -0700 Subject: [PATCH 3/8] Update HiveSessionStateBuilder. --- .../org/apache/spark/sql/hive/HiveSessionStateBuilder.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index 70364e563bff7..0e7df8e921978 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -73,7 +73,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session new FindDataSourceTable(session) +: new ResolveSQLOnFile(session) +: new FallBackFileSourceV2(session) +: - DataSourceResolution(conf) +: + DataSourceResolution(conf, session.catalog(_)) +: customResolutionRules override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = From a21414387339fc4c0f0e2f80b70248a1f998456f Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Mon, 13 May 2019 11:56:59 -0700 Subject: [PATCH 4/8] Minor updates after comments. --- .../org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 2 +- .../sql/catalyst/plans/logical/basicLogicalOperators.scala | 2 +- .../apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala | 2 ++ 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 0682f8ff504c1..ae5198c200b83 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -301,7 +301,7 @@ trait CheckAnalysis extends PredicateHelper { case CreateTableAsSelect(_, _, partitioning, query, _, _, _) => val references = partitioning.flatMap(_.references).toSet val badReferences = references.map(_.fieldNames).flatMap { column => - query.schema.findNestedField(column).map(_.dataType) match { + query.schema.findNestedField(column) match { case Some(_) => None case _ => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 97baa1843c575..543f3440ee75e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -423,7 +423,7 @@ case class CreateTableAsSelect( // that the columns referenced by the table's partitioning exist in the query schema val references = partitioning.flatMap(_.references).toSet references.map(_.fieldNames).forall { column => - query.schema.findNestedField(column).map(_.dataType) match { + query.schema.findNestedField(column) match { case Some(_) => true case _ => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala index e5b0edcb32022..2ecf1c2f184fb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala @@ -122,6 +122,8 @@ private class InMemoryTable( replaceData(data) } + def rows: Seq[InternalRow] = data.flatMap(_.rows) + @volatile var data: Array[BufferedRows] = Array.empty def replaceData(buffers: Array[BufferedRows]): Unit = synchronized { From fe01700c16dc8f89bb93460b131cb84332bc20d4 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Mon, 13 May 2019 12:17:10 -0700 Subject: [PATCH 5/8] Add missing test suite. --- .../sql/sources/v2/DataSourceV2SQLSuite.scala | 163 ++++++++++++++++++ 1 file changed, 163 insertions(+) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala new file mode 100644 index 0000000000000..32bfa29e99de1 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2 + +import scala.collection.JavaConverters._ + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.sql.{AnalysisException, QueryTest} +import org.apache.spark.sql.catalog.v2.Identifier +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException +import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2 +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{LongType, StringType, StructType} + +class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAndAfter { + + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ + + private val orc2 = classOf[OrcDataSourceV2].getName + + before { + spark.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName) + spark.conf.set("spark.sql.default.catalog", "testcat") + + val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data") + df.createOrReplaceTempView("source") + val df2 = spark.createDataFrame(Seq((4L, "d"), (5L, "e"), (6L, "f"))).toDF("id", "data") + df2.createOrReplaceTempView("source2") + } + + after { + spark.catalog("testcat").asInstanceOf[TestInMemoryTableCatalog].clearTables() + spark.sql("DROP TABLE source") + } + + test("CreateTableAsSelect: use v2 plan because catalog is set") { + spark.sql("CREATE TABLE testcat.table_name USING foo AS SELECT id, data FROM source") + + val testCatalog = spark.catalog("testcat").asTableCatalog + val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) + + assert(table.name == "testcat.table_name") + assert(table.partitioning.isEmpty) + assert(table.properties == Map("provider" -> "foo").asJava) + assert(table.schema == new StructType() + .add("id", LongType, nullable = false) + .add("data", StringType)) + + val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows) + checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), spark.table("source")) + } + + test("CreateTableAsSelect: use v2 plan because provider is v2") { + spark.sql(s"CREATE TABLE table_name USING $orc2 AS SELECT id, data FROM source") + + val testCatalog = spark.catalog("testcat").asTableCatalog + val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) + + assert(table.name == "testcat.table_name") + assert(table.partitioning.isEmpty) + assert(table.properties == Map("provider" -> orc2).asJava) + assert(table.schema == new StructType() + .add("id", LongType, nullable = false) + .add("data", StringType)) + + val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows) + checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), spark.table("source")) + } + + test("CreateTableAsSelect: fail if table exists") { + spark.sql("CREATE TABLE testcat.table_name USING foo AS SELECT id, data FROM source") + + val testCatalog = spark.catalog("testcat").asTableCatalog + + val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) + assert(table.name == "testcat.table_name") + assert(table.partitioning.isEmpty) + assert(table.properties == Map("provider" -> "foo").asJava) + assert(table.schema == new StructType() + .add("id", LongType, nullable = false) + .add("data", StringType)) + + val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows) + checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), spark.table("source")) + + // run a second CTAS query that should fail + val exc = intercept[TableAlreadyExistsException] { + spark.sql( + "CREATE TABLE testcat.table_name USING bar AS SELECT id, data, id as id2 FROM source2") + } + + assert(exc.getMessage.contains("table_name")) + + // table should not have changed + val table2 = testCatalog.loadTable(Identifier.of(Array(), "table_name")) + assert(table2.name == "testcat.table_name") + assert(table2.partitioning.isEmpty) + assert(table2.properties == Map("provider" -> "foo").asJava) + assert(table2.schema == new StructType() + .add("id", LongType, nullable = false) + .add("data", StringType)) + + val rdd2 = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows) + checkAnswer(spark.internalCreateDataFrame(rdd2, table.schema), spark.table("source")) + } + + test("CreateTableAsSelect: if not exists") { + spark.sql( + "CREATE TABLE IF NOT EXISTS testcat.table_name USING foo AS SELECT id, data FROM source") + + val testCatalog = spark.catalog("testcat").asTableCatalog + val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) + + assert(table.name == "testcat.table_name") + assert(table.partitioning.isEmpty) + assert(table.properties == Map("provider" -> "foo").asJava) + assert(table.schema == new StructType() + .add("id", LongType, nullable = false) + .add("data", StringType)) + + val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows) + checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), spark.table("source")) + + spark.sql( + "CREATE TABLE IF NOT EXISTS testcat.table_name USING foo AS SELECT id, data FROM source2") + + // check that the table contains data from just the first CTAS + val rdd2 = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows) + checkAnswer(spark.internalCreateDataFrame(rdd2, table.schema), spark.table("source")) + } + + test("CreateTableAsSelect: fail analysis when default catalog is needed but missing") { + val originalDefaultCatalog = conf.getConfString("spark.sql.default.catalog") + try { + conf.unsetConf("spark.sql.default.catalog") + + val exc = intercept[AnalysisException] { + spark.sql(s"CREATE TABLE table_name USING $orc2 AS SELECT id, data FROM source") + } + + assert(exc.getMessage.contains("Default catalog is not set")) + + } finally { + conf.setConfString("spark.sql.default.catalog", originalDefaultCatalog) + } + } +} From b19a70dee4bf6b36ed8ed6c835a5729c2e171a83 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Mon, 13 May 2019 12:19:02 -0700 Subject: [PATCH 6/8] Remove default catalog. --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 7 ------- .../sql/execution/datasources/DataSourceResolution.scala | 4 +--- .../spark/sql/execution/command/PlanResolutionSuite.scala | 3 ++- .../apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala | 3 ++- 4 files changed, 5 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 8b9f3c53f8311..7f577f015973d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1760,11 +1760,6 @@ object SQLConf { .internal() .intConf .createWithDefault(Int.MaxValue) - - val DEFAULT_V2_CATALOG = buildConf("spark.sql.default.catalog") - .doc("Name of the default v2 catalog, used when an catalog is not identified in queries") - .stringConf - .createOptional } /** @@ -2216,8 +2211,6 @@ class SQLConf extends Serializable with Logging { def setCommandRejectsSparkCoreConfs: Boolean = getConf(SQLConf.SET_COMMAND_REJECTS_SPARK_CORE_CONFS) - def defaultV2Catalog: Option[String] = getConf(DEFAULT_V2_CATALOG) - /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index 163452cda8d67..8be6efe1c4fe4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -41,8 +41,6 @@ case class DataSourceResolution( import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ - private def defaultCatalog: Option[CatalogPlugin] = conf.defaultV2Catalog.map(findCatalog) - override def lookupCatalog: Option[String => CatalogPlugin] = Some(findCatalog) override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { @@ -69,7 +67,7 @@ case class DataSourceResolution( case create: CreateTableAsSelectStatement => // the provider was not a v1 source, convert to a v2 plan val CatalogObjectIdentifier(maybeCatalog, identifier) = create.tableName - val catalog = maybeCatalog.orElse(defaultCatalog) + val catalog = maybeCatalog .getOrElse(throw new AnalysisException("Default catalog is not set")) .asTableCatalog convertCTAS(catalog, identifier, create) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index f8119fd862d65..c525b4cbcba57 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -329,7 +329,8 @@ class PlanResolutionSuite extends AnalysisTest { } } - test("Test v2 CTAS with data source v2 provider") { + // TODO(rblue): enable this test after the default catalog is available + ignore("Test v2 CTAS with data source v2 provider") { val sql = s""" |CREATE TABLE IF NOT EXISTS mydb.page_view diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala index 32bfa29e99de1..257727f1c442b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala @@ -66,7 +66,8 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), spark.table("source")) } - test("CreateTableAsSelect: use v2 plan because provider is v2") { + // TODO(rblue): enable this test after the default catalog is available + ignore("CreateTableAsSelect: use v2 plan because provider is v2") { spark.sql(s"CREATE TABLE table_name USING $orc2 AS SELECT id, data FROM source") val testCatalog = spark.catalog("testcat").asTableCatalog From 8b6d8bafc0cafc2d88a7f4137bbd075da7d22291 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Tue, 14 May 2019 09:57:40 -0700 Subject: [PATCH 7/8] More changes from the review. --- .../plans/logical/basicLogicalOperators.scala | 9 +-------- .../datasources/DataSourceResolution.scala | 16 +++++++++++++++- .../sql/sources/v2/DataSourceV2SQLSuite.scala | 4 +++- 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 543f3440ee75e..2b98132f188f5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -422,14 +422,7 @@ case class CreateTableAsSelect( // the table schema is created from the query schema, so the only resolution needed is to check // that the columns referenced by the table's partitioning exist in the query schema val references = partitioning.flatMap(_.references).toSet - references.map(_.fieldNames).forall { column => - query.schema.findNestedField(column) match { - case Some(_) => - true - case _ => - false - } - } + references.map(_.fieldNames).forall(query.schema.findNestedField(_).isDefined) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index 8be6efe1c4fe4..09506f05ccfa4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -68,7 +68,8 @@ case class DataSourceResolution( // the provider was not a v1 source, convert to a v2 plan val CatalogObjectIdentifier(maybeCatalog, identifier) = create.tableName val catalog = maybeCatalog - .getOrElse(throw new AnalysisException("Default catalog is not set")) + .getOrElse(throw new AnalysisException( + s"No catalog specified for table ${identifier.quoted} and no default catalog is set")) .asTableCatalog convertCTAS(catalog, identifier, create) } @@ -140,6 +141,19 @@ case class DataSourceResolution( "you can only specify one of them.") } + if ((ctas.options.contains("provider") || ctas.properties.contains("provider")) + && ctas.comment.isDefined) { + throw new AnalysisException( + "COMMENT and option/property 'comment' are both used to set the table comment, you can " + + "only specify one of them.") + } + + if (ctas.options.contains("provider") || ctas.properties.contains("provider")) { + throw new AnalysisException( + "USING and option/property 'provider' are both used to set the provider implementation, " + + "you can only specify one of them.") + } + val options = ctas.options.filterKeys(_ != "path") // convert the bucket spec and add it as a transform diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala index 257727f1c442b..a9bc0369ad20f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala @@ -155,7 +155,9 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn spark.sql(s"CREATE TABLE table_name USING $orc2 AS SELECT id, data FROM source") } - assert(exc.getMessage.contains("Default catalog is not set")) + assert(exc.getMessage.contains("No catalog specified for table")) + assert(exc.getMessage.contains("table_name")) + assert(exc.getMessage.contains("no default catalog is set")) } finally { conf.setConfString("spark.sql.default.catalog", originalDefaultCatalog) From 99ebc001d2563ad579e5e9a0211606cec4e396be Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Tue, 14 May 2019 09:59:02 -0700 Subject: [PATCH 8/8] Remove unnecessary declared type. --- .../scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 228273cd3044e..8137db854cbcb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2031,7 +2031,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging if (temporary && ifNotExists) { operationNotAllowed("CREATE TEMPORARY TABLE ... IF NOT EXISTS", ctx) } - val multipartIdentifier: Seq[String] = ctx.multipartIdentifier.parts.asScala.map(_.getText) + val multipartIdentifier = ctx.multipartIdentifier.parts.asScala.map(_.getText) (multipartIdentifier, temporary, ifNotExists, ctx.EXTERNAL != null) }