diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java index 8874faa71b5bb..cd131432008a6 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java @@ -17,8 +17,12 @@ package org.apache.spark.sql.catalog.v2; +import com.google.common.base.Preconditions; import org.apache.spark.annotation.Experimental; +import java.util.Arrays; +import java.util.Objects; + /** * An {@link Identifier} implementation. */ @@ -29,6 +33,8 @@ class IdentifierImpl implements Identifier { private String name; IdentifierImpl(String[] namespace, String name) { + Preconditions.checkNotNull(namespace, "Identifier namespace cannot be null"); + Preconditions.checkNotNull(name, "Identifier name cannot be null"); this.namespace = namespace; this.name = name; } @@ -42,4 +48,23 @@ public String[] namespace() { public String name() { return name; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + IdentifierImpl that = (IdentifierImpl) o; + return Arrays.equals(namespace, that.namespace) && name.equals(that.name); + } + + @Override + public int hashCode() { + return Objects.hash(Arrays.hashCode(namespace), name); + } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java new file mode 100644 index 0000000000000..681629d2d5405 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.sql.catalog.v2.expressions.Transform; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.sources.v2.Table; +import org.apache.spark.sql.types.StructType; + +import java.util.Map; + +/** + * Catalog methods for working with Tables. + *

+ * TableCatalog implementations may be case sensitive or case insensitive. Spark will pass + * {@link Identifier table identifiers} without modification. Field names passed to + * {@link #alterTable(Identifier, TableChange...)} will be normalized to match the case used in the + * table schema when updating, renaming, or dropping existing columns when catalyst analysis is case + * insensitive. + */ +public interface TableCatalog extends CatalogPlugin { + /** + * List the tables in a namespace from the catalog. + *

+ * If the catalog supports views, this must return identifiers for only tables and not views. + * + * @param namespace a multi-part namespace + * @return an array of Identifiers for tables + * @throws NoSuchNamespaceException If the namespace does not exist (optional). + */ + Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException; + + /** + * Load table metadata by {@link Identifier identifier} from the catalog. + *

+ * If the catalog supports views and contains a view for the identifier and not a table, this + * must throw {@link NoSuchTableException}. + * + * @param ident a table identifier + * @return the table's metadata + * @throws NoSuchTableException If the table doesn't exist or is a view + */ + Table loadTable(Identifier ident) throws NoSuchTableException; + + /** + * Invalidate cached table metadata for an {@link Identifier identifier}. + *

+ * If the table is already loaded or cached, drop cached data. If the table does not exist or is + * not cached, do nothing. Calling this method should not query remote services. + * + * @param ident a table identifier + */ + default void invalidateTable(Identifier ident) { + } + + /** + * Test whether a table exists using an {@link Identifier identifier} from the catalog. + *

+ * If the catalog supports views and contains a view for the identifier and not a table, this + * must return false. + * + * @param ident a table identifier + * @return true if the table exists, false otherwise + */ + default boolean tableExists(Identifier ident) { + try { + return loadTable(ident) != null; + } catch (NoSuchTableException e) { + return false; + } + } + + /** + * Create a table in the catalog. + * + * @param ident a table identifier + * @param schema the schema of the new table, as a struct type + * @param partitions transforms to use for partitioning data in the table + * @param properties a string map of table properties + * @return metadata for the new table + * @throws TableAlreadyExistsException If a table or view already exists for the identifier + * @throws UnsupportedOperationException If a requested partition transform is not supported + * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) + */ + Table createTable( + Identifier ident, + StructType schema, + Transform[] partitions, + Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException; + + /** + * Apply a set of {@link TableChange changes} to a table in the catalog. + *

+ * Implementations may reject the requested changes. If any change is rejected, none of the + * changes should be applied to the table. + *

+ * If the catalog supports views and contains a view for the identifier and not a table, this + * must throw {@link NoSuchTableException}. + * + * @param ident a table identifier + * @param changes changes to apply to the table + * @return updated metadata for the table + * @throws NoSuchTableException If the table doesn't exist or is a view + * @throws IllegalArgumentException If any change is rejected by the implementation. + */ + Table alterTable( + Identifier ident, + TableChange... changes) throws NoSuchTableException; + + /** + * Drop a table in the catalog. + *

+ * If the catalog supports views and contains a view for the identifier and not a table, this + * must not drop the view and must return false. + * + * @param ident a table identifier + * @return true if a table was deleted, false if no table exists for the identifier + */ + boolean dropTable(Identifier ident); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableChange.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableChange.java new file mode 100644 index 0000000000000..9b87e676d9b2d --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableChange.java @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.sql.types.DataType; + +/** + * TableChange subclasses represent requested changes to a table. These are passed to + * {@link TableCatalog#alterTable}. For example, + *

+ *   import TableChange._
+ *   val catalog = Catalogs.load(name)
+ *   catalog.asTableCatalog.alterTable(ident,
+ *       addColumn("x", IntegerType),
+ *       renameColumn("a", "b"),
+ *       deleteColumn("c")
+ *     )
+ * 
+ */ +public interface TableChange { + + /** + * Create a TableChange for setting a table property. + *

+ * If the property already exists, it will be replaced with the new value. + * + * @param property the property name + * @param value the new property value + * @return a TableChange for the addition + */ + static TableChange setProperty(String property, String value) { + return new SetProperty(property, value); + } + + /** + * Create a TableChange for removing a table property. + *

+ * If the property does not exist, the change will succeed. + * + * @param property the property name + * @return a TableChange for the addition + */ + static TableChange removeProperty(String property) { + return new RemoveProperty(property); + } + + /** + * Create a TableChange for adding an optional column. + *

+ * If the field already exists, the change will result in an {@link IllegalArgumentException}. + * If the new field is nested and its parent does not exist or is not a struct, the change will + * result in an {@link IllegalArgumentException}. + * + * @param fieldNames field names of the new column + * @param dataType the new column's data type + * @return a TableChange for the addition + */ + static TableChange addColumn(String[] fieldNames, DataType dataType) { + return new AddColumn(fieldNames, dataType, true, null); + } + + /** + * Create a TableChange for adding a column. + *

+ * If the field already exists, the change will result in an {@link IllegalArgumentException}. + * If the new field is nested and its parent does not exist or is not a struct, the change will + * result in an {@link IllegalArgumentException}. + * + * @param fieldNames field names of the new column + * @param dataType the new column's data type + * @param isNullable whether the new column can contain null + * @return a TableChange for the addition + */ + static TableChange addColumn(String[] fieldNames, DataType dataType, boolean isNullable) { + return new AddColumn(fieldNames, dataType, isNullable, null); + } + + /** + * Create a TableChange for adding a column. + *

+ * If the field already exists, the change will result in an {@link IllegalArgumentException}. + * If the new field is nested and its parent does not exist or is not a struct, the change will + * result in an {@link IllegalArgumentException}. + * + * @param fieldNames field names of the new column + * @param dataType the new column's data type + * @param isNullable whether the new column can contain null + * @param comment the new field's comment string + * @return a TableChange for the addition + */ + static TableChange addColumn( + String[] fieldNames, + DataType dataType, + boolean isNullable, + String comment) { + return new AddColumn(fieldNames, dataType, isNullable, comment); + } + + /** + * Create a TableChange for renaming a field. + *

+ * The name is used to find the field to rename. The new name will replace the leaf field name. + * For example, renameColumn(["a", "b", "c"], "x") should produce column a.b.x. + *

+ * If the field does not exist, the change will result in an {@link IllegalArgumentException}. + * + * @param fieldNames the current field names + * @param newName the new name + * @return a TableChange for the rename + */ + static TableChange renameColumn(String[] fieldNames, String newName) { + return new RenameColumn(fieldNames, newName); + } + + /** + * Create a TableChange for updating the type of a field that is nullable. + *

+ * The field names are used to find the field to update. + *

+ * If the field does not exist, the change will result in an {@link IllegalArgumentException}. + * + * @param fieldNames field names of the column to update + * @param newDataType the new data type + * @return a TableChange for the update + */ + static TableChange updateColumnType(String[] fieldNames, DataType newDataType) { + return new UpdateColumnType(fieldNames, newDataType, true); + } + + /** + * Create a TableChange for updating the type of a field. + *

+ * The field names are used to find the field to update. + *

+ * If the field does not exist, the change will result in an {@link IllegalArgumentException}. + * + * @param fieldNames field names of the column to update + * @param newDataType the new data type + * @return a TableChange for the update + */ + static TableChange updateColumnType( + String[] fieldNames, + DataType newDataType, + boolean isNullable) { + return new UpdateColumnType(fieldNames, newDataType, isNullable); + } + + /** + * Create a TableChange for updating the comment of a field. + *

+ * The name is used to find the field to update. + *

+ * If the field does not exist, the change will result in an {@link IllegalArgumentException}. + * + * @param fieldNames field names of the column to update + * @param newComment the new comment + * @return a TableChange for the update + */ + static TableChange updateColumnComment(String[] fieldNames, String newComment) { + return new UpdateColumnComment(fieldNames, newComment); + } + + /** + * Create a TableChange for deleting a field. + *

+ * If the field does not exist, the change will result in an {@link IllegalArgumentException}. + * + * @param fieldNames field names of the column to delete + * @return a TableChange for the delete + */ + static TableChange deleteColumn(String[] fieldNames) { + return new DeleteColumn(fieldNames); + } + + /** + * A TableChange to set a table property. + *

+ * If the property already exists, it must be replaced with the new value. + */ + final class SetProperty implements TableChange { + private final String property; + private final String value; + + private SetProperty(String property, String value) { + this.property = property; + this.value = value; + } + + public String property() { + return property; + } + + public String value() { + return value; + } + } + + /** + * A TableChange to remove a table property. + *

+ * If the property does not exist, the change should succeed. + */ + final class RemoveProperty implements TableChange { + private final String property; + + private RemoveProperty(String property) { + this.property = property; + } + + public String property() { + return property; + } + } + + /** + * A TableChange to add a field. + *

+ * If the field already exists, the change must result in an {@link IllegalArgumentException}. + * If the new field is nested and its parent does not exist or is not a struct, the change must + * result in an {@link IllegalArgumentException}. + */ + final class AddColumn implements TableChange { + private final String[] fieldNames; + private final DataType dataType; + private final boolean isNullable; + private final String comment; + + private AddColumn(String[] fieldNames, DataType dataType, boolean isNullable, String comment) { + this.fieldNames = fieldNames; + this.dataType = dataType; + this.isNullable = isNullable; + this.comment = comment; + } + + public String[] fieldNames() { + return fieldNames; + } + + public DataType dataType() { + return dataType; + } + + public boolean isNullable() { + return isNullable; + } + + public String comment() { + return comment; + } + } + + /** + * A TableChange to rename a field. + *

+ * The name is used to find the field to rename. The new name will replace the leaf field name. + * For example, renameColumn("a.b.c", "x") should produce column a.b.x. + *

+ * If the field does not exist, the change must result in an {@link IllegalArgumentException}. + */ + final class RenameColumn implements TableChange { + private final String[] fieldNames; + private final String newName; + + private RenameColumn(String[] fieldNames, String newName) { + this.fieldNames = fieldNames; + this.newName = newName; + } + + public String[] fieldNames() { + return fieldNames; + } + + public String newName() { + return newName; + } + } + + /** + * A TableChange to update the type of a field. + *

+ * The field names are used to find the field to update. + *

+ * If the field does not exist, the change must result in an {@link IllegalArgumentException}. + */ + final class UpdateColumnType implements TableChange { + private final String[] fieldNames; + private final DataType newDataType; + private final boolean isNullable; + + private UpdateColumnType(String[] fieldNames, DataType newDataType, boolean isNullable) { + this.fieldNames = fieldNames; + this.newDataType = newDataType; + this.isNullable = isNullable; + } + + public String[] fieldNames() { + return fieldNames; + } + + public DataType newDataType() { + return newDataType; + } + + public boolean isNullable() { + return isNullable; + } + } + + /** + * A TableChange to update the comment of a field. + *

+ * The field names are used to find the field to update. + *

+ * If the field does not exist, the change must result in an {@link IllegalArgumentException}. + */ + final class UpdateColumnComment implements TableChange { + private final String[] fieldNames; + private final String newComment; + + private UpdateColumnComment(String[] fieldNames, String newComment) { + this.fieldNames = fieldNames; + this.newComment = newComment; + } + + public String[] fieldNames() { + return fieldNames; + } + + public String newComment() { + return newComment; + } + } + + /** + * A TableChange to delete a field. + *

+ * If the field does not exist, the change must result in an {@link IllegalArgumentException}. + */ + final class DeleteColumn implements TableChange { + private final String[] fieldNames; + + private DeleteColumn(String[] fieldNames) { + this.fieldNames = fieldNames; + } + + public String[] fieldNames() { + return fieldNames; + } + } + +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Expressions.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Expressions.java index 009e89bd4eb60..7b264e7480e17 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Expressions.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Expressions.java @@ -40,7 +40,7 @@ private Expressions() { * @param args expression arguments to the transform * @return a logical transform */ - public Transform apply(String name, Expression... args) { + public static Transform apply(String name, Expression... args) { return LogicalExpressions.apply(name, JavaConverters.asScalaBuffer(Arrays.asList(args)).toSeq()); } @@ -51,7 +51,7 @@ public Transform apply(String name, Expression... args) { * @param name a column name * @return a named reference for the column */ - public NamedReference column(String name) { + public static NamedReference column(String name) { return LogicalExpressions.reference(name); } @@ -65,7 +65,7 @@ public NamedReference column(String name) { * @param the JVM type of the value * @return a literal expression for the value */ - public Literal literal(T value) { + public static Literal literal(T value) { return LogicalExpressions.literal(value); } @@ -81,7 +81,7 @@ public Literal literal(T value) { * @param columns input columns for the bucket transform * @return a logical bucket transform with name "bucket" */ - public Transform bucket(int numBuckets, String... columns) { + public static Transform bucket(int numBuckets, String... columns) { return LogicalExpressions.bucket(numBuckets, JavaConverters.asScalaBuffer(Arrays.asList(columns)).toSeq()); } @@ -96,7 +96,7 @@ public Transform bucket(int numBuckets, String... columns) { * @param column an input column * @return a logical identity transform with name "identity" */ - public Transform identity(String column) { + public static Transform identity(String column) { return LogicalExpressions.identity(column); } @@ -110,7 +110,7 @@ public Transform identity(String column) { * @param column an input timestamp or date column * @return a logical yearly transform with name "years" */ - public Transform years(String column) { + public static Transform years(String column) { return LogicalExpressions.years(column); } @@ -125,7 +125,7 @@ public Transform years(String column) { * @param column an input timestamp or date column * @return a logical monthly transform with name "months" */ - public Transform months(String column) { + public static Transform months(String column) { return LogicalExpressions.months(column); } @@ -140,7 +140,7 @@ public Transform months(String column) { * @param column an input timestamp or date column * @return a logical daily transform with name "days" */ - public Transform days(String column) { + public static Transform days(String column) { return LogicalExpressions.days(column); } @@ -155,7 +155,7 @@ public Transform days(String column) { * @param column an input timestamp column * @return a logical hourly transform with name "hours" */ - public Transform hours(String column) { + public static Transform hours(String column) { return LogicalExpressions.hours(column); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/Table.java similarity index 72% rename from sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java rename to sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/Table.java index 78f979a2a9a44..482d3c22e2306 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/Table.java @@ -18,8 +18,11 @@ package org.apache.spark.sql.sources.v2; import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.catalog.v2.expressions.Transform; import org.apache.spark.sql.types.StructType; +import java.util.Collections; +import java.util.Map; import java.util.Set; /** @@ -29,6 +32,10 @@ *

* This interface can mixin the following interfaces to support different operations, like * {@code SupportsRead}. + *

+ * The default implementation of {@link #partitioning()} returns an empty array of partitions, and + * the default implementation of {@link #properties()} returns an empty map. These should be + * overridden by implementations that support partitioning and table properties. */ @Evolving public interface Table { @@ -45,6 +52,20 @@ public interface Table { */ StructType schema(); + /** + * Returns the physical partitioning of this table. + */ + default Transform[] partitioning() { + return new Transform[0]; + } + + /** + * Returns the string map of table properties. + */ + default Map properties() { + return Collections.emptyMap(); + } + /** * Returns the set of capabilities for this table. */ diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java similarity index 93% rename from sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java rename to sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java index 4640c61faeb7c..7fff09fae6a3e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java @@ -66,7 +66,7 @@ public enum TableCapability { *

* Truncating a table removes all existing rows. *

- * See {@link org.apache.spark.sql.sources.v2.writer.SupportsTruncate}. + * See {@code org.apache.spark.sql.sources.v2.writer.SupportsTruncate}. */ TRUNCATE, @@ -74,7 +74,7 @@ public enum TableCapability { * Signals that the table can replace existing data that matches a filter with appended data in * a write operation. *

- * See {@link org.apache.spark.sql.sources.v2.writer.SupportsOverwrite}. + * See {@code org.apache.spark.sql.sources.v2.writer.SupportsOverwrite}. */ OVERWRITE_BY_FILTER, @@ -82,7 +82,7 @@ public enum TableCapability { * Signals that the table can dynamically replace existing data partitions with appended data in * a write operation. *

- * See {@link org.apache.spark.sql.sources.v2.writer.SupportsDynamicOverwrite}. + * See {@code org.apache.spark.sql.sources.v2.writer.SupportsDynamicOverwrite}. */ OVERWRITE_DYNAMIC } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogV2Implicits.scala new file mode 100644 index 0000000000000..f512cd5e23c6b --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogV2Implicits.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2 + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalog.v2.expressions.{BucketTransform, IdentityTransform, LogicalExpressions, Transform} +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.types.StructType + +/** + * Conversion helpers for working with v2 [[CatalogPlugin]]. + */ +object CatalogV2Implicits { + implicit class PartitionTypeHelper(partitionType: StructType) { + def asTransforms: Array[Transform] = partitionType.names.map(LogicalExpressions.identity) + } + + implicit class BucketSpecHelper(spec: BucketSpec) { + def asTransform: BucketTransform = { + if (spec.sortColumnNames.nonEmpty) { + throw new AnalysisException( + s"Cannot convert bucketing with sort columns to a transform: $spec") + } + + LogicalExpressions.bucket(spec.numBuckets, spec.bucketColumnNames: _*) + } + } + + implicit class TransformHelper(transforms: Seq[Transform]) { + def asPartitionColumns: Seq[String] = { + val (idTransforms, nonIdTransforms) = transforms.partition(_.isInstanceOf[IdentityTransform]) + + if (nonIdTransforms.nonEmpty) { + throw new AnalysisException("Transforms cannot be converted to partition columns: " + + nonIdTransforms.map(_.describe).mkString(", ")) + } + + idTransforms.map(_.asInstanceOf[IdentityTransform]).map(_.reference).map { ref => + val parts = ref.fieldNames + if (parts.size > 1) { + throw new AnalysisException(s"Cannot partition by nested column: $ref") + } else { + parts(0) + } + } + } + } + + implicit class CatalogHelper(plugin: CatalogPlugin) { + def asTableCatalog: TableCatalog = plugin match { + case tableCatalog: TableCatalog => + tableCatalog + case _ => + throw new AnalysisException(s"Cannot use catalog ${plugin.name}: not a TableCatalog") + } + } + + implicit class NamespaceHelper(namespace: Array[String]) { + def quoted: String = namespace.map(quote).mkString(".") + } + + implicit class IdentifierHelper(ident: Identifier) { + def quoted: String = { + if (ident.namespace.nonEmpty) { + ident.namespace.map(quote).mkString(".") + "." + quote(ident.name) + } else { + quote(ident.name) + } + } + } + + implicit class MultipartIdentifierHelper(namespace: Seq[String]) { + def quoted: String = namespace.map(quote).mkString(".") + } + + private def quote(part: String): String = { + if (part.contains(".") || part.contains("`")) { + s"`${part.replace("`", "``")}`" + } else { + part + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/expressions/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/expressions/expressions.scala index 813d88255c6a2..2d4d6e7c6d5ee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/expressions/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/expressions/expressions.scala @@ -17,9 +17,7 @@ package org.apache.spark.sql.catalog.v2.expressions -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst -import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, IntegerType, StringType} @@ -35,38 +33,6 @@ private[sql] object LogicalExpressions { // because this is only used for field names, the SQL conf passed in does not matter. private lazy val parser = new CatalystSqlParser(SQLConf.get) - def fromPartitionColumns(columns: String*): Array[IdentityTransform] = - columns.map(identity).toArray - - def fromBucketSpec(spec: BucketSpec): BucketTransform = { - if (spec.sortColumnNames.nonEmpty) { - throw new AnalysisException( - s"Cannot convert bucketing with sort columns to a transform: $spec") - } - - bucket(spec.numBuckets, spec.bucketColumnNames: _*) - } - - implicit class TransformHelper(transforms: Seq[Transform]) { - def asPartitionColumns: Seq[String] = { - val (idTransforms, nonIdTransforms) = transforms.partition(_.isInstanceOf[IdentityTransform]) - - if (nonIdTransforms.nonEmpty) { - throw new AnalysisException("Transforms cannot be converted to partition columns: " + - nonIdTransforms.map(_.describe).mkString(", ")) - } - - idTransforms.map(_.asInstanceOf[IdentityTransform]).map(_.reference).map { ref => - val parts = ref.fieldNames - if (parts.size > 1) { - throw new AnalysisException(s"Cannot partition by nested column: $ref") - } else { - parts(0) - } - } - } - } - def literal[T](value: T): LiteralValue[T] = { val internalLit = catalyst.expressions.Literal(value) literal(value, internalLit.dataType) @@ -183,17 +149,10 @@ private[sql] final case class LiteralValue[T](value: T, dataType: DataType) exte } private[sql] final case class FieldReference(parts: Seq[String]) extends NamedReference { + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.MultipartIdentifierHelper override def fieldNames: Array[String] = parts.toArray - override def describe: String = parts.map(quote).mkString(".") + override def describe: String = parts.quoted override def toString: String = describe - - private def quote(part: String): String = { - if (part.contains(".") || part.contains("`")) { - s"`${part.replace("`", "``")}`" - } else { - part - } - } } private[sql] object FieldReference { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala index 6d587abd8fd4d..f5e9a146bf359 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ +import org.apache.spark.sql.catalog.v2.Identifier import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec /** @@ -25,13 +27,26 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec * as an [[org.apache.spark.sql.AnalysisException]] with the correct position information. */ class DatabaseAlreadyExistsException(db: String) - extends AnalysisException(s"Database '$db' already exists") + extends NamespaceAlreadyExistsException(s"Database '$db' already exists") -class TableAlreadyExistsException(db: String, table: String) - extends AnalysisException(s"Table or view '$table' already exists in database '$db'") +class NamespaceAlreadyExistsException(message: String) extends AnalysisException(message) { + def this(namespace: Array[String]) = { + this(s"Namespace '${namespace.quoted}' already exists") + } +} + +class TableAlreadyExistsException(message: String) extends AnalysisException(message) { + def this(db: String, table: String) = { + this(s"Table or view '$table' already exists in database '$db'") + } + + def this(tableIdent: Identifier) = { + this(s"Table ${tableIdent.quoted} already exists") + } +} class TempTableAlreadyExistsException(table: String) - extends AnalysisException(s"Temporary view '$table' already exists") + extends TableAlreadyExistsException(s"Temporary view '$table' already exists") class PartitionAlreadyExistsException(db: String, table: String, spec: TablePartitionSpec) extends AnalysisException( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala index 8bf6f69f3b17a..7ac8ae61ed537 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ +import org.apache.spark.sql.catalog.v2.Identifier import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec @@ -25,10 +27,24 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec * Thrown by a catalog when an item cannot be found. The analyzer will rethrow the exception * as an [[org.apache.spark.sql.AnalysisException]] with the correct position information. */ -class NoSuchDatabaseException(val db: String) extends AnalysisException(s"Database '$db' not found") +class NoSuchDatabaseException( + val db: String) extends NoSuchNamespaceException(s"Database '$db' not found") -class NoSuchTableException(db: String, table: String) - extends AnalysisException(s"Table or view '$table' not found in database '$db'") +class NoSuchNamespaceException(message: String) extends AnalysisException(message) { + def this(namespace: Array[String]) = { + this(s"Namespace '${namespace.quoted}' not found") + } +} + +class NoSuchTableException(message: String) extends AnalysisException(message) { + def this(db: String, table: String) = { + this(s"Table or view '$table' not found in database '$db'") + } + + def this(tableIdent: Identifier) = { + this(s"Table ${tableIdent.quoted} not found") + } +} class NoSuchPartitionException( db: String, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala new file mode 100644 index 0000000000000..9c1b9a3e53de2 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala @@ -0,0 +1,657 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2 + +import java.util +import java.util.Collections + +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException} +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class TableCatalogSuite extends SparkFunSuite { + import CatalogV2Implicits._ + + private val emptyProps: util.Map[String, String] = Collections.emptyMap[String, String] + private val schema: StructType = new StructType() + .add("id", IntegerType) + .add("data", StringType) + + private def newCatalog(): TableCatalog = { + val newCatalog = new TestTableCatalog + newCatalog.initialize("test", CaseInsensitiveStringMap.empty()) + newCatalog + } + + private val testIdent = Identifier.of(Array("`", "."), "test_table") + + test("Catalogs can load the catalog") { + val catalog = newCatalog() + + val conf = new SQLConf + conf.setConfString("spark.sql.catalog.test", catalog.getClass.getName) + + val loaded = Catalogs.load("test", conf) + assert(loaded.getClass == catalog.getClass) + } + + test("listTables") { + val catalog = newCatalog() + val ident1 = Identifier.of(Array("ns"), "test_table_1") + val ident2 = Identifier.of(Array("ns"), "test_table_2") + val ident3 = Identifier.of(Array("ns2"), "test_table_1") + + assert(catalog.listTables(Array("ns")).isEmpty) + + catalog.createTable(ident1, schema, Array.empty, emptyProps) + + assert(catalog.listTables(Array("ns")).toSet == Set(ident1)) + assert(catalog.listTables(Array("ns2")).isEmpty) + + catalog.createTable(ident3, schema, Array.empty, emptyProps) + catalog.createTable(ident2, schema, Array.empty, emptyProps) + + assert(catalog.listTables(Array("ns")).toSet == Set(ident1, ident2)) + assert(catalog.listTables(Array("ns2")).toSet == Set(ident3)) + + catalog.dropTable(ident1) + + assert(catalog.listTables(Array("ns")).toSet == Set(ident2)) + + catalog.dropTable(ident2) + + assert(catalog.listTables(Array("ns")).isEmpty) + assert(catalog.listTables(Array("ns2")).toSet == Set(ident3)) + } + + test("createTable") { + val catalog = newCatalog() + + assert(!catalog.tableExists(testIdent)) + + val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + + val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name) + assert(parsed == Seq("`", ".", "test_table")) + assert(table.schema == schema) + assert(table.properties.asScala == Map()) + + assert(catalog.tableExists(testIdent)) + } + + test("createTable: with properties") { + val catalog = newCatalog() + + val properties = new util.HashMap[String, String]() + properties.put("property", "value") + + assert(!catalog.tableExists(testIdent)) + + val table = catalog.createTable(testIdent, schema, Array.empty, properties) + + val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name) + assert(parsed == Seq("`", ".", "test_table")) + assert(table.schema == schema) + assert(table.properties == properties) + + assert(catalog.tableExists(testIdent)) + } + + test("createTable: table already exists") { + val catalog = newCatalog() + + assert(!catalog.tableExists(testIdent)) + + val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + + val exc = intercept[TableAlreadyExistsException] { + catalog.createTable(testIdent, schema, Array.empty, emptyProps) + } + + assert(exc.message.contains(table.name())) + assert(exc.message.contains("already exists")) + + assert(catalog.tableExists(testIdent)) + } + + test("tableExists") { + val catalog = newCatalog() + + assert(!catalog.tableExists(testIdent)) + + catalog.createTable(testIdent, schema, Array.empty, emptyProps) + + assert(catalog.tableExists(testIdent)) + + catalog.dropTable(testIdent) + + assert(!catalog.tableExists(testIdent)) + } + + test("loadTable") { + val catalog = newCatalog() + + val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val loaded = catalog.loadTable(testIdent) + + assert(table.name == loaded.name) + assert(table.schema == loaded.schema) + assert(table.properties == loaded.properties) + } + + test("loadTable: table does not exist") { + val catalog = newCatalog() + + val exc = intercept[NoSuchTableException] { + catalog.loadTable(testIdent) + } + + assert(exc.message.contains(testIdent.quoted)) + assert(exc.message.contains("not found")) + } + + test("invalidateTable") { + val catalog = newCatalog() + + val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + catalog.invalidateTable(testIdent) + + val loaded = catalog.loadTable(testIdent) + + assert(table.name == loaded.name) + assert(table.schema == loaded.schema) + assert(table.properties == loaded.properties) + } + + test("invalidateTable: table does not exist") { + val catalog = newCatalog() + + assert(catalog.tableExists(testIdent) === false) + + catalog.invalidateTable(testIdent) + } + + test("alterTable: add property") { + val catalog = newCatalog() + + val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + + assert(table.properties.asScala == Map()) + + val updated = catalog.alterTable(testIdent, TableChange.setProperty("prop-1", "1")) + assert(updated.properties.asScala == Map("prop-1" -> "1")) + + val loaded = catalog.loadTable(testIdent) + assert(loaded.properties.asScala == Map("prop-1" -> "1")) + + assert(table.properties.asScala == Map()) + } + + test("alterTable: add property to existing") { + val catalog = newCatalog() + + val properties = new util.HashMap[String, String]() + properties.put("prop-1", "1") + + val table = catalog.createTable(testIdent, schema, Array.empty, properties) + + assert(table.properties.asScala == Map("prop-1" -> "1")) + + val updated = catalog.alterTable(testIdent, TableChange.setProperty("prop-2", "2")) + assert(updated.properties.asScala == Map("prop-1" -> "1", "prop-2" -> "2")) + + val loaded = catalog.loadTable(testIdent) + assert(loaded.properties.asScala == Map("prop-1" -> "1", "prop-2" -> "2")) + + assert(table.properties.asScala == Map("prop-1" -> "1")) + } + + test("alterTable: remove existing property") { + val catalog = newCatalog() + + val properties = new util.HashMap[String, String]() + properties.put("prop-1", "1") + + val table = catalog.createTable(testIdent, schema, Array.empty, properties) + + assert(table.properties.asScala == Map("prop-1" -> "1")) + + val updated = catalog.alterTable(testIdent, TableChange.removeProperty("prop-1")) + assert(updated.properties.asScala == Map()) + + val loaded = catalog.loadTable(testIdent) + assert(loaded.properties.asScala == Map()) + + assert(table.properties.asScala == Map("prop-1" -> "1")) + } + + test("alterTable: remove missing property") { + val catalog = newCatalog() + + val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + + assert(table.properties.asScala == Map()) + + val updated = catalog.alterTable(testIdent, TableChange.removeProperty("prop-1")) + assert(updated.properties.asScala == Map()) + + val loaded = catalog.loadTable(testIdent) + assert(loaded.properties.asScala == Map()) + + assert(table.properties.asScala == Map()) + } + + test("alterTable: add top-level column") { + val catalog = newCatalog() + + val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + + assert(table.schema == schema) + + val updated = catalog.alterTable(testIdent, TableChange.addColumn(Array("ts"), TimestampType)) + + assert(updated.schema == schema.add("ts", TimestampType)) + } + + test("alterTable: add required column") { + val catalog = newCatalog() + + val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + + assert(table.schema == schema) + + val updated = catalog.alterTable(testIdent, + TableChange.addColumn(Array("ts"), TimestampType, false)) + + assert(updated.schema == schema.add("ts", TimestampType, nullable = false)) + } + + test("alterTable: add column with comment") { + val catalog = newCatalog() + + val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + + assert(table.schema == schema) + + val updated = catalog.alterTable(testIdent, + TableChange.addColumn(Array("ts"), TimestampType, false, "comment text")) + + val field = StructField("ts", TimestampType, nullable = false).withComment("comment text") + assert(updated.schema == schema.add(field)) + } + + test("alterTable: add nested column") { + val catalog = newCatalog() + + val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) + val tableSchema = schema.add("point", pointStruct) + + val table = catalog.createTable(testIdent, tableSchema, Array.empty, emptyProps) + + assert(table.schema == tableSchema) + + val updated = catalog.alterTable(testIdent, + TableChange.addColumn(Array("point", "z"), DoubleType)) + + val expectedSchema = schema.add("point", pointStruct.add("z", DoubleType)) + + assert(updated.schema == expectedSchema) + } + + test("alterTable: add column to primitive field fails") { + val catalog = newCatalog() + + val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + + assert(table.schema == schema) + + val exc = intercept[IllegalArgumentException] { + catalog.alterTable(testIdent, TableChange.addColumn(Array("data", "ts"), TimestampType)) + } + + assert(exc.getMessage.contains("Not a struct")) + assert(exc.getMessage.contains("data")) + + // the table has not changed + assert(catalog.loadTable(testIdent).schema == schema) + } + + test("alterTable: add field to missing column fails") { + val catalog = newCatalog() + + val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + + assert(table.schema == schema) + + val exc = intercept[IllegalArgumentException] { + catalog.alterTable(testIdent, + TableChange.addColumn(Array("missing_col", "new_field"), StringType)) + } + + assert(exc.getMessage.contains("missing_col")) + assert(exc.getMessage.contains("Cannot find")) + } + + test("alterTable: update column data type") { + val catalog = newCatalog() + + val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + + assert(table.schema == schema) + + val updated = catalog.alterTable(testIdent, TableChange.updateColumnType(Array("id"), LongType)) + + val expectedSchema = new StructType().add("id", LongType).add("data", StringType) + assert(updated.schema == expectedSchema) + } + + test("alterTable: update column data type and nullability") { + val catalog = newCatalog() + + val originalSchema = new StructType() + .add("id", IntegerType, nullable = false) + .add("data", StringType) + val table = catalog.createTable(testIdent, originalSchema, Array.empty, emptyProps) + + assert(table.schema == originalSchema) + + val updated = catalog.alterTable(testIdent, + TableChange.updateColumnType(Array("id"), LongType, true)) + + val expectedSchema = new StructType().add("id", LongType).add("data", StringType) + assert(updated.schema == expectedSchema) + } + + test("alterTable: update optional column to required fails") { + val catalog = newCatalog() + + val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + + assert(table.schema == schema) + + val exc = intercept[IllegalArgumentException] { + catalog.alterTable(testIdent, TableChange.updateColumnType(Array("id"), LongType, false)) + } + + assert(exc.getMessage.contains("Cannot change optional column to required")) + assert(exc.getMessage.contains("id")) + } + + test("alterTable: update missing column fails") { + val catalog = newCatalog() + + val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + + assert(table.schema == schema) + + val exc = intercept[IllegalArgumentException] { + catalog.alterTable(testIdent, + TableChange.updateColumnType(Array("missing_col"), LongType)) + } + + assert(exc.getMessage.contains("missing_col")) + assert(exc.getMessage.contains("Cannot find")) + } + + test("alterTable: add comment") { + val catalog = newCatalog() + + val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + + assert(table.schema == schema) + + val updated = catalog.alterTable(testIdent, + TableChange.updateColumnComment(Array("id"), "comment text")) + + val expectedSchema = new StructType() + .add("id", IntegerType, nullable = true, "comment text") + .add("data", StringType) + assert(updated.schema == expectedSchema) + } + + test("alterTable: replace comment") { + val catalog = newCatalog() + + val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + + assert(table.schema == schema) + + catalog.alterTable(testIdent, TableChange.updateColumnComment(Array("id"), "comment text")) + + val expectedSchema = new StructType() + .add("id", IntegerType, nullable = true, "replacement comment") + .add("data", StringType) + + val updated = catalog.alterTable(testIdent, + TableChange.updateColumnComment(Array("id"), "replacement comment")) + + assert(updated.schema == expectedSchema) + } + + test("alterTable: add comment to missing column fails") { + val catalog = newCatalog() + + val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + + assert(table.schema == schema) + + val exc = intercept[IllegalArgumentException] { + catalog.alterTable(testIdent, + TableChange.updateColumnComment(Array("missing_col"), "comment")) + } + + assert(exc.getMessage.contains("missing_col")) + assert(exc.getMessage.contains("Cannot find")) + } + + test("alterTable: rename top-level column") { + val catalog = newCatalog() + + val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + + assert(table.schema == schema) + + val updated = catalog.alterTable(testIdent, TableChange.renameColumn(Array("id"), "some_id")) + + val expectedSchema = new StructType().add("some_id", IntegerType).add("data", StringType) + + assert(updated.schema == expectedSchema) + } + + test("alterTable: rename nested column") { + val catalog = newCatalog() + + val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) + val tableSchema = schema.add("point", pointStruct) + + val table = catalog.createTable(testIdent, tableSchema, Array.empty, emptyProps) + + assert(table.schema == tableSchema) + + val updated = catalog.alterTable(testIdent, + TableChange.renameColumn(Array("point", "x"), "first")) + + val newPointStruct = new StructType().add("first", DoubleType).add("y", DoubleType) + val expectedSchema = schema.add("point", newPointStruct) + + assert(updated.schema == expectedSchema) + } + + test("alterTable: rename struct column") { + val catalog = newCatalog() + + val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) + val tableSchema = schema.add("point", pointStruct) + + val table = catalog.createTable(testIdent, tableSchema, Array.empty, emptyProps) + + assert(table.schema == tableSchema) + + val updated = catalog.alterTable(testIdent, + TableChange.renameColumn(Array("point"), "p")) + + val newPointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) + val expectedSchema = schema.add("p", newPointStruct) + + assert(updated.schema == expectedSchema) + } + + test("alterTable: rename missing column fails") { + val catalog = newCatalog() + + val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + + assert(table.schema == schema) + + val exc = intercept[IllegalArgumentException] { + catalog.alterTable(testIdent, + TableChange.renameColumn(Array("missing_col"), "new_name")) + } + + assert(exc.getMessage.contains("missing_col")) + assert(exc.getMessage.contains("Cannot find")) + } + + test("alterTable: multiple changes") { + val catalog = newCatalog() + + val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) + val tableSchema = schema.add("point", pointStruct) + + val table = catalog.createTable(testIdent, tableSchema, Array.empty, emptyProps) + + assert(table.schema == tableSchema) + + val updated = catalog.alterTable(testIdent, + TableChange.renameColumn(Array("point", "x"), "first"), + TableChange.renameColumn(Array("point", "y"), "second")) + + val newPointStruct = new StructType().add("first", DoubleType).add("second", DoubleType) + val expectedSchema = schema.add("point", newPointStruct) + + assert(updated.schema == expectedSchema) + } + + test("alterTable: delete top-level column") { + val catalog = newCatalog() + + val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + + assert(table.schema == schema) + + val updated = catalog.alterTable(testIdent, + TableChange.deleteColumn(Array("id"))) + + val expectedSchema = new StructType().add("data", StringType) + assert(updated.schema == expectedSchema) + } + + test("alterTable: delete nested column") { + val catalog = newCatalog() + + val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) + val tableSchema = schema.add("point", pointStruct) + + val table = catalog.createTable(testIdent, tableSchema, Array.empty, emptyProps) + + assert(table.schema == tableSchema) + + val updated = catalog.alterTable(testIdent, + TableChange.deleteColumn(Array("point", "y"))) + + val newPointStruct = new StructType().add("x", DoubleType) + val expectedSchema = schema.add("point", newPointStruct) + + assert(updated.schema == expectedSchema) + } + + test("alterTable: delete missing column fails") { + val catalog = newCatalog() + + val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + + assert(table.schema == schema) + + val exc = intercept[IllegalArgumentException] { + catalog.alterTable(testIdent, TableChange.deleteColumn(Array("missing_col"))) + } + + assert(exc.getMessage.contains("missing_col")) + assert(exc.getMessage.contains("Cannot find")) + } + + test("alterTable: delete missing nested column fails") { + val catalog = newCatalog() + + val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) + val tableSchema = schema.add("point", pointStruct) + + val table = catalog.createTable(testIdent, tableSchema, Array.empty, emptyProps) + + assert(table.schema == tableSchema) + + val exc = intercept[IllegalArgumentException] { + catalog.alterTable(testIdent, TableChange.deleteColumn(Array("point", "z"))) + } + + assert(exc.getMessage.contains("z")) + assert(exc.getMessage.contains("Cannot find")) + } + + test("alterTable: table does not exist") { + val catalog = newCatalog() + + val exc = intercept[NoSuchTableException] { + catalog.alterTable(testIdent, TableChange.setProperty("prop", "val")) + } + + assert(exc.message.contains(testIdent.quoted)) + assert(exc.message.contains("not found")) + } + + test("dropTable") { + val catalog = newCatalog() + + assert(!catalog.tableExists(testIdent)) + + catalog.createTable(testIdent, schema, Array.empty, emptyProps) + + assert(catalog.tableExists(testIdent)) + + val wasDropped = catalog.dropTable(testIdent) + + assert(wasDropped) + assert(!catalog.tableExists(testIdent)) + } + + test("dropTable: table does not exist") { + val catalog = newCatalog() + + assert(!catalog.tableExists(testIdent)) + + val wasDropped = catalog.dropTable(testIdent) + + assert(!wasDropped) + assert(!catalog.tableExists(testIdent)) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala new file mode 100644 index 0000000000000..7a0b014a85462 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2 + +import java.util +import java.util.Collections +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, DeleteColumn, RemoveProperty, RenameColumn, SetProperty, UpdateColumnComment, UpdateColumnType} +import org.apache.spark.sql.catalog.v2.expressions.Transform +import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException} +import org.apache.spark.sql.sources.v2.{Table, TableCapability} +import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class TestTableCatalog extends TableCatalog { + import CatalogV2Implicits._ + + private val tables: util.Map[Identifier, Table] = new ConcurrentHashMap[Identifier, Table]() + private var _name: Option[String] = None + + override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = { + _name = Some(name) + } + + override def name: String = _name.get + + override def listTables(namespace: Array[String]): Array[Identifier] = { + tables.keySet.asScala.filter(_.namespace.sameElements(namespace)).toArray + } + + override def loadTable(ident: Identifier): Table = { + Option(tables.get(ident)) match { + case Some(table) => + table + case _ => + throw new NoSuchTableException(ident) + } + } + + override def createTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): Table = { + + if (tables.containsKey(ident)) { + throw new TableAlreadyExistsException(ident) + } + + if (partitions.nonEmpty) { + throw new UnsupportedOperationException( + s"Catalog $name: Partitioned tables are not supported") + } + + val table = InMemoryTable(ident.quoted, schema, properties) + + tables.put(ident, table) + + table + } + + override def alterTable(ident: Identifier, changes: TableChange*): Table = { + val table = loadTable(ident) + val properties = TestTableCatalog.applyPropertiesChanges(table.properties, changes) + val schema = TestTableCatalog.applySchemaChanges(table.schema, changes) + val newTable = InMemoryTable(table.name, schema, properties) + + tables.put(ident, newTable) + + newTable + } + + override def dropTable(ident: Identifier): Boolean = Option(tables.remove(ident)).isDefined +} + +private object TestTableCatalog { + /** + * Apply properties changes to a map and return the result. + */ + def applyPropertiesChanges( + properties: util.Map[String, String], + changes: Seq[TableChange]): util.Map[String, String] = { + val newProperties = new util.HashMap[String, String](properties) + + changes.foreach { + case set: SetProperty => + newProperties.put(set.property, set.value) + + case unset: RemoveProperty => + newProperties.remove(unset.property) + + case _ => + // ignore non-property changes + } + + Collections.unmodifiableMap(newProperties) + } + + /** + * Apply schema changes to a schema and return the result. + */ + def applySchemaChanges(schema: StructType, changes: Seq[TableChange]): StructType = { + changes.foldLeft(schema) { (schema, change) => + change match { + case add: AddColumn => + add.fieldNames match { + case Array(name) => + val newField = StructField(name, add.dataType, nullable = add.isNullable) + Option(add.comment) match { + case Some(comment) => + schema.add(newField.withComment(comment)) + case _ => + schema.add(newField) + } + + case names => + replace(schema, names.init, parent => parent.dataType match { + case parentType: StructType => + val field = StructField(names.last, add.dataType, nullable = add.isNullable) + val newParentType = Option(add.comment) match { + case Some(comment) => + parentType.add(field.withComment(comment)) + case None => + parentType.add(field) + } + + Some(StructField(parent.name, newParentType, parent.nullable, parent.metadata)) + + case _ => + throw new IllegalArgumentException(s"Not a struct: ${names.init.last}") + }) + } + + case rename: RenameColumn => + replace(schema, rename.fieldNames, field => + Some(StructField(rename.newName, field.dataType, field.nullable, field.metadata))) + + case update: UpdateColumnType => + replace(schema, update.fieldNames, field => { + if (!update.isNullable && field.nullable) { + throw new IllegalArgumentException( + s"Cannot change optional column to required: $field.name") + } + Some(StructField(field.name, update.newDataType, update.isNullable, field.metadata)) + }) + + case update: UpdateColumnComment => + replace(schema, update.fieldNames, field => + Some(field.withComment(update.newComment))) + + case delete: DeleteColumn => + replace(schema, delete.fieldNames, _ => None) + + case _ => + // ignore non-schema changes + schema + } + } + } + + private def replace( + struct: StructType, + path: Seq[String], + update: StructField => Option[StructField]): StructType = { + + val pos = struct.getFieldIndex(path.head) + .getOrElse(throw new IllegalArgumentException(s"Cannot find field: ${path.head}")) + val field = struct.fields(pos) + val replacement: Option[StructField] = if (path.tail.isEmpty) { + update(field) + } else { + field.dataType match { + case nestedStruct: StructType => + val updatedType: StructType = replace(nestedStruct, path.tail, update) + Some(StructField(field.name, updatedType, field.nullable, field.metadata)) + case _ => + throw new IllegalArgumentException(s"Not a struct: ${path.head}") + } + } + + val newFields = struct.fields.zipWithIndex.flatMap { + case (_, index) if pos == index => + replacement + case (other, _) => + Some(other) + } + + new StructType(newFields) + } +} + +case class InMemoryTable( + name: String, + schema: StructType, + override val properties: util.Map[String, String]) extends Table { + override def partitioning: Array[Transform] = Array.empty + override def capabilities: util.Set[TableCapability] = InMemoryTable.CAPABILITIES +} + +object InMemoryTable { + val CAPABILITIES: util.Set[TableCapability] = Set.empty[TableCapability].asJava +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index f503ff03b971c..30ecad642dc16 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.sources.v2.TableProvider import org.apache.spark.sql.types.StructType case class DataSourceResolution(conf: SQLConf) extends Rule[LogicalPlan] with CastSupport { - import org.apache.spark.sql.catalog.v2.expressions.LogicalExpressions.TransformHelper + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.TransformHelper override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case CreateTableStatement( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala index c0c57b862055f..3b0cde5efbbd4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala @@ -16,11 +16,14 @@ */ package org.apache.spark.sql.execution.datasources.v2 +import java.util + import scala.collection.JavaConverters._ import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalog.v2.expressions.Transform import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources.v2.{SupportsRead, SupportsWrite, Table, TableCapability} import org.apache.spark.sql.sources.v2.TableCapability._ @@ -35,6 +38,8 @@ abstract class FileTable( userSpecifiedSchema: Option[StructType]) extends Table with SupportsRead with SupportsWrite { + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ + lazy val fileIndex: PartitioningAwareFileIndex = { val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap // Hadoop Configurations are case sensitive. @@ -82,7 +87,11 @@ abstract class FileTable( StructType(fields) } - override def capabilities(): java.util.Set[TableCapability] = FileTable.CAPABILITIES + override def partitioning: Array[Transform] = fileIndex.partitionSchema.asTransforms + + override def properties: util.Map[String, String] = options.asCaseSensitiveMap + + override def capabilities: java.util.Set[TableCapability] = FileTable.CAPABILITIES /** * When possible, this method should return the schema of the given `files`. When the format diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java index dfbea927e477b..391af5a306a16 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.util.Arrays; +import org.apache.spark.sql.catalog.v2.expressions.Expressions; +import org.apache.spark.sql.catalog.v2.expressions.Transform; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; import org.apache.spark.sql.sources.v2.Table; @@ -56,6 +58,11 @@ public Partitioning outputPartitioning() { @Override public Table getTable(CaseInsensitiveStringMap options) { return new JavaSimpleBatchTable() { + @Override + public Transform[] partitioning() { + return new Transform[] { Expressions.identity("i") }; + } + @Override public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { return new MyScanBuilder();