diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index f5b808197c9be..08ce482a65cc6 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -139,8 +139,8 @@ statement DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* #dropTablePartitions | ALTER TABLE tableIdentifier partitionSpec? SET locationSpec #setTableLocation | ALTER TABLE tableIdentifier RECOVER PARTITIONS #recoverPartitions - | DROP TABLE (IF EXISTS)? tableIdentifier PURGE? #dropTable - | DROP VIEW (IF EXISTS)? tableIdentifier #dropTable + | DROP TABLE (IF EXISTS)? multipartIdentifier PURGE? #dropTable + | DROP VIEW (IF EXISTS)? multipartIdentifier #dropTable | CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)? VIEW (IF NOT EXISTS)? tableIdentifier identifierCommentList? diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogManager.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogManager.java new file mode 100644 index 0000000000000..2d52e5395a83c --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogManager.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.SparkException; +import org.apache.spark.annotation.Private; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +import static org.apache.spark.sql.catalog.v2.Catalogs.classKey; +import static org.apache.spark.sql.catalog.v2.Catalogs.isOptionKey; +import static org.apache.spark.sql.catalog.v2.Catalogs.optionKeyPrefix; +import static scala.collection.JavaConverters.mapAsJavaMapConverter; + +@Private +public class CatalogManager { + + private final SQLConf conf; + + public CatalogManager(SQLConf conf) { + this.conf = conf; + } + + /** + * Load a catalog. + * + * @param name a catalog name + * @return a catalog plugin + */ + public CatalogPlugin load(String name) throws SparkException { + return Catalogs.load(name, conf); + } + + /** + * Add a catalog. + * + * @param name a catalog name + * @param pluginClassName a catalog plugin class name + * @param options catalog options + */ + public void add( + String name, + String pluginClassName, + CaseInsensitiveStringMap options) { + options.entrySet().stream() + .forEach(e -> conf.setConfString(optionKeyPrefix(name) + e.getKey(), e.getValue())); + conf.setConfString(classKey(name), pluginClassName); + } + + /** + * Add a catalog without option. + * + * @param name a catalog name + * @param pluginClassName a catalog plugin class name + */ + public void add( + String name, + String pluginClassName) { + add(name, pluginClassName, CaseInsensitiveStringMap.empty()); + } + + /** + * Remove a catalog. + * + * @param name a catalog name + */ + public void remove(String name) { + conf.unsetConf(classKey(name)); + mapAsJavaMapConverter(conf.getAllConfs()).asJava().keySet().stream() + .filter(key -> isOptionKey(name, key)) + .forEach(conf::unsetConf); + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java index 851a6a9f6d165..d53e5bfb25142 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java @@ -23,10 +23,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.apache.spark.util.Utils; -import java.util.HashMap; import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; +import java.util.stream.Collectors; import static scala.collection.JavaConverters.mapAsJavaMapConverter; @@ -35,6 +33,23 @@ public class Catalogs { private Catalogs() { } + public static String classKey(String name) { + return "spark.sql.catalog." + name; + } + + public static String optionKeyPrefix(String name) { + return "spark.sql.catalog." + name + "."; + } + + public static boolean isOptionKey(String name, String keyName) { + return keyName.startsWith(optionKeyPrefix(name)); + } + + public static String optionName(String name, String keyName) { + assert(isOptionKey(name, keyName)); + return keyName.substring(optionKeyPrefix(name).length()); + } + /** * Load and configure a catalog by name. *

@@ -49,10 +64,10 @@ private Catalogs() { */ public static CatalogPlugin load(String name, SQLConf conf) throws CatalogNotFoundException, SparkException { - String pluginClassName = conf.getConfString("spark.sql.catalog." + name, null); + String pluginClassName = conf.getConfString(classKey(name), null); if (pluginClassName == null) { throw new CatalogNotFoundException(String.format( - "Catalog '%s' plugin class not found: spark.sql.catalog.%s is not defined", name, name)); + "Catalog '%s' plugin class not found: %s is not defined", name, classKey(name))); } ClassLoader loader = Utils.getContextOrSparkClassLoader(); @@ -96,17 +111,12 @@ public static CatalogPlugin load(String name, SQLConf conf) * @return a case insensitive string map of options starting with spark.sql.catalog.(name). */ private static CaseInsensitiveStringMap catalogOptions(String name, SQLConf conf) { - Map allConfs = mapAsJavaMapConverter(conf.getAllConfs()).asJava(); - Pattern prefix = Pattern.compile("^spark\\.sql\\.catalog\\." + name + "\\.(.+)"); - - HashMap options = new HashMap<>(); - for (Map.Entry entry : allConfs.entrySet()) { - Matcher matcher = prefix.matcher(entry.getKey()); - if (matcher.matches() && matcher.groupCount() > 0) { - options.put(matcher.group(1), entry.getValue()); - } - } - + Map options = + mapAsJavaMapConverter(conf.getAllConfs()).asJava().entrySet().stream() + .filter(e -> isOptionKey(name, e.getKey())) + .collect(Collectors.toMap( + e -> optionName(name, e.getKey()), + e -> e.getValue())); return new CaseInsensitiveStringMap(options); } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java index cd131432008a6..51c5a589dd0fe 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java @@ -22,6 +22,8 @@ import java.util.Arrays; import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * An {@link Identifier} implementation. @@ -49,6 +51,21 @@ public String name() { return name; } + private String quote(String part) { + if (part.contains("`")) { + return part.replace("`", "``"); + } else { + return part; + } + } + + @Override + public String toString() { + return Stream.concat(Stream.of(namespace), Stream.of(name)) + .map(part -> '`' + quote(part) + '`') + .collect(Collectors.joining(".")); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java index da41346d7ce71..2bf1b6c22929e 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Set; /** @@ -178,4 +179,17 @@ public double getDouble(String key, double defaultValue) { public Map asCaseSensitiveMap() { return Collections.unmodifiableMap(original); } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CaseInsensitiveStringMap that = (CaseInsensitiveStringMap) o; + return delegate.equals(that.delegate); + } + + @Override + public int hashCode() { + return Objects.hash(delegate); + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 42755fc274192..6bf12cff28f9c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -499,6 +499,14 @@ object OverwritePartitionsDynamic { } } +/** + * Drop a table. + */ +case class DropTable( + catalog: TableCatalog, + ident: Identifier, + ifExists: Boolean) extends Command + /** * Insert some data into a table. Note that this plan is unresolved and has to be replaced by the diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DropTableStatement.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DropTableStatement.scala new file mode 100644 index 0000000000000..bc31e57ac1b2b --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DropTableStatement.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.sql + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +/** + * A DROP TABLE statement, as parsed from SQL. + */ +case class DropTableStatement( + tableName: Seq[String], + ifExists: Boolean, + isView: Boolean, + purge: Boolean) extends ParsedStatement { + + override def output: Seq[Attribute] = Seq.empty + + override def children: Seq[LogicalPlan] = Seq.empty +} diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogManagerSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogManagerSuite.java new file mode 100644 index 0000000000000..880cb457fbef4 --- /dev/null +++ b/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogManagerSuite.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.SparkException; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.HashMap; + +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertThat; + +public class CatalogManagerSuite { + + @Rule + public final ExpectedException exception = ExpectedException.none(); + + CatalogManager catalogManager = new CatalogManager(new SQLConf()); + + @Test + public void testAdd() throws SparkException { + CaseInsensitiveStringMap options = new CaseInsensitiveStringMap( + new HashMap() {{ + put("option1", "value1"); + put("option2", "value2"); + }}); + catalogManager.add("testcat", TestCatalogPlugin.class.getCanonicalName(), options); + CatalogPlugin catalogPlugin = catalogManager.load("testcat"); + assertThat(catalogPlugin.name(), is("testcat")); + assertThat(catalogPlugin, instanceOf(TestCatalogPlugin.class)); + assertThat(((TestCatalogPlugin) catalogPlugin).options, is(options)); + } + + @Test + public void testAddWithOption() throws SparkException { + catalogManager.add("testcat", TestCatalogPlugin.class.getCanonicalName()); + CatalogPlugin catalogPlugin = catalogManager.load("testcat"); + assertThat(catalogPlugin.name(), is("testcat")); + assertThat(catalogPlugin, instanceOf(TestCatalogPlugin.class)); + assertThat(((TestCatalogPlugin) catalogPlugin).options, is(CaseInsensitiveStringMap.empty())); + } + + @Test + public void testRemove() throws SparkException { + catalogManager.add("testcat", TestCatalogPlugin.class.getCanonicalName()); + catalogManager.load("testcat"); + catalogManager.remove("testcat"); + exception.expect(CatalogNotFoundException.class); + catalogManager.load("testcat"); + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/CatalogV2TestUtils.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/CatalogV2TestUtils.scala new file mode 100644 index 0000000000000..663eed6e109da --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/CatalogV2TestUtils.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2 + +import org.apache.spark.sql.internal.SQLConf + +private[sql] trait CatalogV2TestUtils { + + protected lazy val catalogManager: CatalogManager = new CatalogManager(SQLConf.get) + + /** + * Adds a catalog. + */ + protected def addCatalog(name: String, pluginClassName: String): Unit = + catalogManager.add(name, pluginClassName) + + /** + * Removes catalogs. + */ + protected def removeCatalog(catalogNames: String*): Unit = + catalogNames.foreach { catalogName => + catalogManager.remove(catalogName) + } + + /** + * Sets the default catalog. + * + * @param catalog the new default catalog + */ + protected def setDefaultCatalog(catalog: String): Unit = + SQLConf.get.setConfString(SQLConf.DEFAULT_V2_CATALOG.key, catalog) + + /** + * Returns the current default catalog. + */ + protected def defaultCatalog: Option[String] = SQLConf.get.defaultV2Catalog + + /** + * Restores the default catalog to the previously saved value. + */ + protected def restoreDefaultCatalog(previous: Option[String]): Unit = + previous.foreach(SQLConf.get.setConfString(SQLConf.DEFAULT_V2_CATALOG.key, _)) +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala index 78b4763484cc0..f0bb5739046ed 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala @@ -89,6 +89,8 @@ class TestTableCatalog extends TableCatalog { } override def dropTable(ident: Identifier): Boolean = Option(tables.remove(ident)).isDefined + + override def toString: String = name } object TestTableCatalog { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index f433cc8d32793..c292a8e301afc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -649,8 +649,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { * Create a [[DropTableCommand]] command. */ override def visitDropTable(ctx: DropTableContext): LogicalPlan = withOrigin(ctx) { - DropTableCommand( - visitTableIdentifier(ctx.tableIdentifier), + sql.DropTableStatement( + visitMultipartIdentifier(ctx.multipartIdentifier()), ctx.EXISTS != null, ctx.VIEW != null, ctx.PURGE != null) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index acbe37349762b..bc874a0ac4deb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -27,9 +27,10 @@ import org.apache.spark.sql.catalog.v2.expressions.Transform import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.CastSupport import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils} -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, LogicalPlan} -import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.DropTableCommand import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2.TableProvider import org.apache.spark.sql.types.StructType @@ -83,6 +84,12 @@ case class DataSourceResolution( s"No catalog specified for table ${identifier.quoted} and no default catalog is set")) .asTableCatalog convertCTAS(catalog, identifier, create) + + case DropTableStatement(CatalogObjectIdentifier(Some(catalog), ident), ifExists, _, _) => + DropTable(catalog.asTableCatalog, ident, ifExists) + + case DropTableStatement(AsTableIdentifier(tableName), ifExists, isView, purge) => + DropTableCommand(tableName, ifExists, isView, purge) } object V1WriteProvider { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index a1d547eb7e86d..27d87960edb3f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -23,7 +23,7 @@ import scala.collection.mutable import org.apache.spark.sql.{AnalysisException, Strategy} import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, CreateV2Table, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition} import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec} @@ -199,6 +199,9 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { Nil } + case DropTable(catalog, ident, ifExists) => + DropTableExec(catalog, ident, ifExists) :: Nil + case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala new file mode 100644 index 0000000000000..d325e0205f9d8 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.LeafExecNode + +/** + * Physical plan node for dropping a table. + */ +case class DropTableExec(catalog: TableCatalog, ident: Identifier, ifExists: Boolean) + extends LeafExecNode { + + override def doExecute(): RDD[InternalRow] = { + if (catalog.tableExists(ident)) { + catalog.dropTable(ident) + } else if (!ifExists) { + throw new NoSuchTableException(ident) + } + + sqlContext.sparkContext.parallelize(Seq.empty, 1) + } + + override def output: Seq[Attribute] = Seq.empty +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index d7bfbce73af05..e51d07f6587c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -32,13 +32,13 @@ import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan import org.apache.spark.sql.catalyst.expressions.JsonTuple import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.catalyst.plans.logical.{Generate, InsertIntoDir, LogicalPlan} -import org.apache.spark.sql.catalyst.plans.logical.{Project, ScriptTransformation} +import org.apache.spark.sql.catalyst.plans.logical.{Generate, InsertIntoDir, LogicalPlan, Project, ScriptTransformation} +import org.apache.spark.sql.catalyst.plans.logical.sql.DropTableStatement import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} class DDLParserSuite extends PlanTest with SharedSQLContext { private lazy val parser = new SparkSqlParser(new SQLConf) @@ -906,59 +906,39 @@ class DDLParserSuite extends PlanTest with SharedSQLContext { test("drop table") { val tableName1 = "db.tab" val tableName2 = "tab" - - val parsed = Seq( - s"DROP TABLE $tableName1", - s"DROP TABLE IF EXISTS $tableName1", - s"DROP TABLE $tableName2", - s"DROP TABLE IF EXISTS $tableName2", - s"DROP TABLE $tableName2 PURGE", - s"DROP TABLE IF EXISTS $tableName2 PURGE" - ).map(parser.parsePlan) - - val expected = Seq( - DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = false, isView = false, - purge = false), - DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = true, isView = false, - purge = false), - DropTableCommand(TableIdentifier("tab", None), ifExists = false, isView = false, - purge = false), - DropTableCommand(TableIdentifier("tab", None), ifExists = true, isView = false, - purge = false), - DropTableCommand(TableIdentifier("tab", None), ifExists = false, isView = false, - purge = true), - DropTableCommand(TableIdentifier("tab", None), ifExists = true, isView = false, - purge = true)) - - parsed.zip(expected).foreach { case (p, e) => comparePlans(p, e) } + Seq( + (s"DROP TABLE $tableName1", + DropTableStatement(Seq("db", "tab"), ifExists = false, isView = false, purge = false)), + (s"DROP TABLE IF EXISTS $tableName1", + DropTableStatement(Seq("db", "tab"), ifExists = true, isView = false, purge = false)), + (s"DROP TABLE $tableName2", + DropTableStatement(Seq("tab"), ifExists = false, isView = false, purge = false)), + (s"DROP TABLE IF EXISTS $tableName2", + DropTableStatement(Seq("tab"), ifExists = true, isView = false, purge = false)), + (s"DROP TABLE $tableName2 PURGE", + DropTableStatement(Seq("tab"), ifExists = false, isView = false, purge = true)), + (s"DROP TABLE IF EXISTS $tableName2 PURGE", + DropTableStatement(Seq("tab"), ifExists = true, isView = false, purge = true))).foreach { + case (sql, expected) => + comparePlans(parser.parsePlan(sql), expected, checkAnalysis = false) + } } test("drop view") { val viewName1 = "db.view" val viewName2 = "view" - - val parsed1 = parser.parsePlan(s"DROP VIEW $viewName1") - val parsed2 = parser.parsePlan(s"DROP VIEW IF EXISTS $viewName1") - val parsed3 = parser.parsePlan(s"DROP VIEW $viewName2") - val parsed4 = parser.parsePlan(s"DROP VIEW IF EXISTS $viewName2") - - val expected1 = - DropTableCommand(TableIdentifier("view", Option("db")), ifExists = false, isView = true, - purge = false) - val expected2 = - DropTableCommand(TableIdentifier("view", Option("db")), ifExists = true, isView = true, - purge = false) - val expected3 = - DropTableCommand(TableIdentifier("view", None), ifExists = false, isView = true, - purge = false) - val expected4 = - DropTableCommand(TableIdentifier("view", None), ifExists = true, isView = true, - purge = false) - - comparePlans(parsed1, expected1) - comparePlans(parsed2, expected2) - comparePlans(parsed3, expected3) - comparePlans(parsed4, expected4) + Seq( + (s"DROP VIEW $viewName1", + DropTableStatement(Seq("db", "view"), ifExists = false, isView = true, purge = false)), + (s"DROP VIEW IF EXISTS $viewName1", + DropTableStatement(Seq("db", "view"), ifExists = true, isView = true, purge = false)), + (s"DROP VIEW $viewName2", + DropTableStatement(Seq("view"), ifExists = false, isView = true, purge = false)), + (s"DROP VIEW IF EXISTS $viewName2", + DropTableStatement(Seq("view"), ifExists = true, isView = true, purge = false))).foreach { + case (sql, expected) => + comparePlans(parser.parsePlan(sql), expected, checkAnalysis = false) + } } test("show columns") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala index 2424e6e1d2d1e..f3126e4cf893a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala @@ -22,29 +22,35 @@ import scala.collection.JavaConverters._ import org.scalatest.BeforeAndAfter import org.apache.spark.sql.{AnalysisException, QueryTest} -import org.apache.spark.sql.catalog.v2.Identifier -import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException +import org.apache.spark.sql.catalog.v2.{CatalogV2TestUtils, Identifier} +import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2 import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{LongType, StringType, StructType} -class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAndAfter { +class DataSourceV2SQLSuite + extends QueryTest with SharedSQLContext with BeforeAndAfter with CatalogV2TestUtils { import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ private val orc2 = classOf[OrcDataSourceV2].getName - before { - spark.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName) - spark.conf.set("spark.sql.default.catalog", "testcat") + private val previousDefaultCatalog = defaultCatalog + before { val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data") df.createOrReplaceTempView("source") val df2 = spark.createDataFrame(Seq((4L, "d"), (5L, "e"), (6L, "f"))).toDF("id", "data") df2.createOrReplaceTempView("source2") + + addCatalog("testcat", classOf[TestInMemoryTableCatalog].getName) + setDefaultCatalog("testcat") } after { + restoreDefaultCatalog(previousDefaultCatalog) + removeCatalog("testcat") + spark.catalog("testcat").asInstanceOf[TestInMemoryTableCatalog].clearTables() spark.sql("DROP TABLE source") } @@ -266,4 +272,20 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn conf.setConfString("spark.sql.default.catalog", originalDefaultCatalog) } } + + test("DropTable: basic") { + val tableName = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + sql(s"CREATE TABLE $tableName USING foo AS SELECT id, data FROM source") + assert(spark.catalog("testcat").asTableCatalog.tableExists(ident) === true) + sql(s"DROP TABLE $tableName") + assert(spark.catalog("testcat").asTableCatalog.tableExists(ident) === false) + } + + test("DropTable: if exists") { + intercept[NoSuchTableException] { + sql(s"DROP TABLE testcat.db.notbl") + } + sql(s"DROP TABLE IF EXISTS testcat.db.notbl") + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala index 2ecf1c2f184fb..b970d62bc30cd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala @@ -102,6 +102,8 @@ class TestInMemoryTableCatalog extends TableCatalog { def clearTables(): Unit = { tables.clear() } + + override def toString: String = name } /**