diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java index 8874faa71b5bb..cd131432008a6 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java @@ -17,8 +17,12 @@ package org.apache.spark.sql.catalog.v2; +import com.google.common.base.Preconditions; import org.apache.spark.annotation.Experimental; +import java.util.Arrays; +import java.util.Objects; + /** * An {@link Identifier} implementation. */ @@ -29,6 +33,8 @@ class IdentifierImpl implements Identifier { private String name; IdentifierImpl(String[] namespace, String name) { + Preconditions.checkNotNull(namespace, "Identifier namespace cannot be null"); + Preconditions.checkNotNull(name, "Identifier name cannot be null"); this.namespace = namespace; this.name = name; } @@ -42,4 +48,23 @@ public String[] namespace() { public String name() { return name; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + IdentifierImpl that = (IdentifierImpl) o; + return Arrays.equals(namespace, that.namespace) && name.equals(that.name); + } + + @Override + public int hashCode() { + return Objects.hash(Arrays.hashCode(namespace), name); + } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java new file mode 100644 index 0000000000000..681629d2d5405 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.sql.catalog.v2.expressions.Transform; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.sources.v2.Table; +import org.apache.spark.sql.types.StructType; + +import java.util.Map; + +/** + * Catalog methods for working with Tables. + *
+ * TableCatalog implementations may be case sensitive or case insensitive. Spark will pass + * {@link Identifier table identifiers} without modification. Field names passed to + * {@link #alterTable(Identifier, TableChange...)} will be normalized to match the case used in the + * table schema when updating, renaming, or dropping existing columns when catalyst analysis is case + * insensitive. + */ +public interface TableCatalog extends CatalogPlugin { + /** + * List the tables in a namespace from the catalog. + *
+ * If the catalog supports views, this must return identifiers for only tables and not views. + * + * @param namespace a multi-part namespace + * @return an array of Identifiers for tables + * @throws NoSuchNamespaceException If the namespace does not exist (optional). + */ + Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException; + + /** + * Load table metadata by {@link Identifier identifier} from the catalog. + *
+ * If the catalog supports views and contains a view for the identifier and not a table, this + * must throw {@link NoSuchTableException}. + * + * @param ident a table identifier + * @return the table's metadata + * @throws NoSuchTableException If the table doesn't exist or is a view + */ + Table loadTable(Identifier ident) throws NoSuchTableException; + + /** + * Invalidate cached table metadata for an {@link Identifier identifier}. + *
+ * If the table is already loaded or cached, drop cached data. If the table does not exist or is + * not cached, do nothing. Calling this method should not query remote services. + * + * @param ident a table identifier + */ + default void invalidateTable(Identifier ident) { + } + + /** + * Test whether a table exists using an {@link Identifier identifier} from the catalog. + *
+ * If the catalog supports views and contains a view for the identifier and not a table, this
+ * must return false.
+ *
+ * @param ident a table identifier
+ * @return true if the table exists, false otherwise
+ */
+ default boolean tableExists(Identifier ident) {
+ try {
+ return loadTable(ident) != null;
+ } catch (NoSuchTableException e) {
+ return false;
+ }
+ }
+
+ /**
+ * Create a table in the catalog.
+ *
+ * @param ident a table identifier
+ * @param schema the schema of the new table, as a struct type
+ * @param partitions transforms to use for partitioning data in the table
+ * @param properties a string map of table properties
+ * @return metadata for the new table
+ * @throws TableAlreadyExistsException If a table or view already exists for the identifier
+ * @throws UnsupportedOperationException If a requested partition transform is not supported
+ * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
+ */
+ Table createTable(
+ Identifier ident,
+ StructType schema,
+ Transform[] partitions,
+ Map
+ * Implementations may reject the requested changes. If any change is rejected, none of the
+ * changes should be applied to the table.
+ *
+ * If the catalog supports views and contains a view for the identifier and not a table, this
+ * must throw {@link NoSuchTableException}.
+ *
+ * @param ident a table identifier
+ * @param changes changes to apply to the table
+ * @return updated metadata for the table
+ * @throws NoSuchTableException If the table doesn't exist or is a view
+ * @throws IllegalArgumentException If any change is rejected by the implementation.
+ */
+ Table alterTable(
+ Identifier ident,
+ TableChange... changes) throws NoSuchTableException;
+
+ /**
+ * Drop a table in the catalog.
+ *
+ * If the catalog supports views and contains a view for the identifier and not a table, this
+ * must not drop the view and must return false.
+ *
+ * @param ident a table identifier
+ * @return true if a table was deleted, false if no table exists for the identifier
+ */
+ boolean dropTable(Identifier ident);
+}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableChange.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableChange.java
new file mode 100644
index 0000000000000..9b87e676d9b2d
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableChange.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.sql.types.DataType;
+
+/**
+ * TableChange subclasses represent requested changes to a table. These are passed to
+ * {@link TableCatalog#alterTable}. For example,
+ *
+ * If the property already exists, it will be replaced with the new value.
+ *
+ * @param property the property name
+ * @param value the new property value
+ * @return a TableChange for the addition
+ */
+ static TableChange setProperty(String property, String value) {
+ return new SetProperty(property, value);
+ }
+
+ /**
+ * Create a TableChange for removing a table property.
+ *
+ * If the property does not exist, the change will succeed.
+ *
+ * @param property the property name
+ * @return a TableChange for the addition
+ */
+ static TableChange removeProperty(String property) {
+ return new RemoveProperty(property);
+ }
+
+ /**
+ * Create a TableChange for adding an optional column.
+ *
+ * If the field already exists, the change will result in an {@link IllegalArgumentException}.
+ * If the new field is nested and its parent does not exist or is not a struct, the change will
+ * result in an {@link IllegalArgumentException}.
+ *
+ * @param fieldNames field names of the new column
+ * @param dataType the new column's data type
+ * @return a TableChange for the addition
+ */
+ static TableChange addColumn(String[] fieldNames, DataType dataType) {
+ return new AddColumn(fieldNames, dataType, true, null);
+ }
+
+ /**
+ * Create a TableChange for adding a column.
+ *
+ * If the field already exists, the change will result in an {@link IllegalArgumentException}.
+ * If the new field is nested and its parent does not exist or is not a struct, the change will
+ * result in an {@link IllegalArgumentException}.
+ *
+ * @param fieldNames field names of the new column
+ * @param dataType the new column's data type
+ * @param isNullable whether the new column can contain null
+ * @return a TableChange for the addition
+ */
+ static TableChange addColumn(String[] fieldNames, DataType dataType, boolean isNullable) {
+ return new AddColumn(fieldNames, dataType, isNullable, null);
+ }
+
+ /**
+ * Create a TableChange for adding a column.
+ *
+ * If the field already exists, the change will result in an {@link IllegalArgumentException}.
+ * If the new field is nested and its parent does not exist or is not a struct, the change will
+ * result in an {@link IllegalArgumentException}.
+ *
+ * @param fieldNames field names of the new column
+ * @param dataType the new column's data type
+ * @param isNullable whether the new column can contain null
+ * @param comment the new field's comment string
+ * @return a TableChange for the addition
+ */
+ static TableChange addColumn(
+ String[] fieldNames,
+ DataType dataType,
+ boolean isNullable,
+ String comment) {
+ return new AddColumn(fieldNames, dataType, isNullable, comment);
+ }
+
+ /**
+ * Create a TableChange for renaming a field.
+ *
+ * The name is used to find the field to rename. The new name will replace the leaf field name.
+ * For example, renameColumn(["a", "b", "c"], "x") should produce column a.b.x.
+ *
+ * If the field does not exist, the change will result in an {@link IllegalArgumentException}.
+ *
+ * @param fieldNames the current field names
+ * @param newName the new name
+ * @return a TableChange for the rename
+ */
+ static TableChange renameColumn(String[] fieldNames, String newName) {
+ return new RenameColumn(fieldNames, newName);
+ }
+
+ /**
+ * Create a TableChange for updating the type of a field that is nullable.
+ *
+ * The field names are used to find the field to update.
+ *
+ * If the field does not exist, the change will result in an {@link IllegalArgumentException}.
+ *
+ * @param fieldNames field names of the column to update
+ * @param newDataType the new data type
+ * @return a TableChange for the update
+ */
+ static TableChange updateColumnType(String[] fieldNames, DataType newDataType) {
+ return new UpdateColumnType(fieldNames, newDataType, true);
+ }
+
+ /**
+ * Create a TableChange for updating the type of a field.
+ *
+ * The field names are used to find the field to update.
+ *
+ * If the field does not exist, the change will result in an {@link IllegalArgumentException}.
+ *
+ * @param fieldNames field names of the column to update
+ * @param newDataType the new data type
+ * @return a TableChange for the update
+ */
+ static TableChange updateColumnType(
+ String[] fieldNames,
+ DataType newDataType,
+ boolean isNullable) {
+ return new UpdateColumnType(fieldNames, newDataType, isNullable);
+ }
+
+ /**
+ * Create a TableChange for updating the comment of a field.
+ *
+ * The name is used to find the field to update.
+ *
+ * If the field does not exist, the change will result in an {@link IllegalArgumentException}.
+ *
+ * @param fieldNames field names of the column to update
+ * @param newComment the new comment
+ * @return a TableChange for the update
+ */
+ static TableChange updateColumnComment(String[] fieldNames, String newComment) {
+ return new UpdateColumnComment(fieldNames, newComment);
+ }
+
+ /**
+ * Create a TableChange for deleting a field.
+ *
+ * If the field does not exist, the change will result in an {@link IllegalArgumentException}.
+ *
+ * @param fieldNames field names of the column to delete
+ * @return a TableChange for the delete
+ */
+ static TableChange deleteColumn(String[] fieldNames) {
+ return new DeleteColumn(fieldNames);
+ }
+
+ /**
+ * A TableChange to set a table property.
+ *
+ * If the property already exists, it must be replaced with the new value.
+ */
+ final class SetProperty implements TableChange {
+ private final String property;
+ private final String value;
+
+ private SetProperty(String property, String value) {
+ this.property = property;
+ this.value = value;
+ }
+
+ public String property() {
+ return property;
+ }
+
+ public String value() {
+ return value;
+ }
+ }
+
+ /**
+ * A TableChange to remove a table property.
+ *
+ * If the property does not exist, the change should succeed.
+ */
+ final class RemoveProperty implements TableChange {
+ private final String property;
+
+ private RemoveProperty(String property) {
+ this.property = property;
+ }
+
+ public String property() {
+ return property;
+ }
+ }
+
+ /**
+ * A TableChange to add a field.
+ *
+ * If the field already exists, the change must result in an {@link IllegalArgumentException}.
+ * If the new field is nested and its parent does not exist or is not a struct, the change must
+ * result in an {@link IllegalArgumentException}.
+ */
+ final class AddColumn implements TableChange {
+ private final String[] fieldNames;
+ private final DataType dataType;
+ private final boolean isNullable;
+ private final String comment;
+
+ private AddColumn(String[] fieldNames, DataType dataType, boolean isNullable, String comment) {
+ this.fieldNames = fieldNames;
+ this.dataType = dataType;
+ this.isNullable = isNullable;
+ this.comment = comment;
+ }
+
+ public String[] fieldNames() {
+ return fieldNames;
+ }
+
+ public DataType dataType() {
+ return dataType;
+ }
+
+ public boolean isNullable() {
+ return isNullable;
+ }
+
+ public String comment() {
+ return comment;
+ }
+ }
+
+ /**
+ * A TableChange to rename a field.
+ *
+ * The name is used to find the field to rename. The new name will replace the leaf field name.
+ * For example, renameColumn("a.b.c", "x") should produce column a.b.x.
+ *
+ * If the field does not exist, the change must result in an {@link IllegalArgumentException}.
+ */
+ final class RenameColumn implements TableChange {
+ private final String[] fieldNames;
+ private final String newName;
+
+ private RenameColumn(String[] fieldNames, String newName) {
+ this.fieldNames = fieldNames;
+ this.newName = newName;
+ }
+
+ public String[] fieldNames() {
+ return fieldNames;
+ }
+
+ public String newName() {
+ return newName;
+ }
+ }
+
+ /**
+ * A TableChange to update the type of a field.
+ *
+ * The field names are used to find the field to update.
+ *
+ * If the field does not exist, the change must result in an {@link IllegalArgumentException}.
+ */
+ final class UpdateColumnType implements TableChange {
+ private final String[] fieldNames;
+ private final DataType newDataType;
+ private final boolean isNullable;
+
+ private UpdateColumnType(String[] fieldNames, DataType newDataType, boolean isNullable) {
+ this.fieldNames = fieldNames;
+ this.newDataType = newDataType;
+ this.isNullable = isNullable;
+ }
+
+ public String[] fieldNames() {
+ return fieldNames;
+ }
+
+ public DataType newDataType() {
+ return newDataType;
+ }
+
+ public boolean isNullable() {
+ return isNullable;
+ }
+ }
+
+ /**
+ * A TableChange to update the comment of a field.
+ *
+ * The field names are used to find the field to update.
+ *
+ * If the field does not exist, the change must result in an {@link IllegalArgumentException}.
+ */
+ final class UpdateColumnComment implements TableChange {
+ private final String[] fieldNames;
+ private final String newComment;
+
+ private UpdateColumnComment(String[] fieldNames, String newComment) {
+ this.fieldNames = fieldNames;
+ this.newComment = newComment;
+ }
+
+ public String[] fieldNames() {
+ return fieldNames;
+ }
+
+ public String newComment() {
+ return newComment;
+ }
+ }
+
+ /**
+ * A TableChange to delete a field.
+ *
+ * If the field does not exist, the change must result in an {@link IllegalArgumentException}.
+ */
+ final class DeleteColumn implements TableChange {
+ private final String[] fieldNames;
+
+ private DeleteColumn(String[] fieldNames) {
+ this.fieldNames = fieldNames;
+ }
+
+ public String[] fieldNames() {
+ return fieldNames;
+ }
+ }
+
+}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Expressions.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Expressions.java
index 009e89bd4eb60..7b264e7480e17 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Expressions.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Expressions.java
@@ -40,7 +40,7 @@ private Expressions() {
* @param args expression arguments to the transform
* @return a logical transform
*/
- public Transform apply(String name, Expression... args) {
+ public static Transform apply(String name, Expression... args) {
return LogicalExpressions.apply(name,
JavaConverters.asScalaBuffer(Arrays.asList(args)).toSeq());
}
@@ -51,7 +51,7 @@ public Transform apply(String name, Expression... args) {
* @param name a column name
* @return a named reference for the column
*/
- public NamedReference column(String name) {
+ public static NamedReference column(String name) {
return LogicalExpressions.reference(name);
}
@@ -65,7 +65,7 @@ public NamedReference column(String name) {
* @param
* This interface can mixin the following interfaces to support different operations, like
* {@code SupportsRead}.
+ *
+ * The default implementation of {@link #partitioning()} returns an empty array of partitions, and
+ * the default implementation of {@link #properties()} returns an empty map. These should be
+ * overridden by implementations that support partitioning and table properties.
*/
@Evolving
public interface Table {
@@ -45,6 +52,20 @@ public interface Table {
*/
StructType schema();
+ /**
+ * Returns the physical partitioning of this table.
+ */
+ default Transform[] partitioning() {
+ return new Transform[0];
+ }
+
+ /**
+ * Returns the string map of table properties.
+ */
+ default Map
* Truncating a table removes all existing rows.
*
- * See {@link org.apache.spark.sql.sources.v2.writer.SupportsTruncate}.
+ * See {@code org.apache.spark.sql.sources.v2.writer.SupportsTruncate}.
*/
TRUNCATE,
@@ -74,7 +74,7 @@ public enum TableCapability {
* Signals that the table can replace existing data that matches a filter with appended data in
* a write operation.
*
- * See {@link org.apache.spark.sql.sources.v2.writer.SupportsOverwrite}.
+ * See {@code org.apache.spark.sql.sources.v2.writer.SupportsOverwrite}.
*/
OVERWRITE_BY_FILTER,
@@ -82,7 +82,7 @@ public enum TableCapability {
* Signals that the table can dynamically replace existing data partitions with appended data in
* a write operation.
*
- * See {@link org.apache.spark.sql.sources.v2.writer.SupportsDynamicOverwrite}.
+ * See {@code org.apache.spark.sql.sources.v2.writer.SupportsDynamicOverwrite}.
*/
OVERWRITE_DYNAMIC
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogV2Implicits.scala
new file mode 100644
index 0000000000000..f512cd5e23c6b
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogV2Implicits.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalog.v2.expressions.{BucketTransform, IdentityTransform, LogicalExpressions, Transform}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Conversion helpers for working with v2 [[CatalogPlugin]].
+ */
+object CatalogV2Implicits {
+ implicit class PartitionTypeHelper(partitionType: StructType) {
+ def asTransforms: Array[Transform] = partitionType.names.map(LogicalExpressions.identity)
+ }
+
+ implicit class BucketSpecHelper(spec: BucketSpec) {
+ def asTransform: BucketTransform = {
+ if (spec.sortColumnNames.nonEmpty) {
+ throw new AnalysisException(
+ s"Cannot convert bucketing with sort columns to a transform: $spec")
+ }
+
+ LogicalExpressions.bucket(spec.numBuckets, spec.bucketColumnNames: _*)
+ }
+ }
+
+ implicit class TransformHelper(transforms: Seq[Transform]) {
+ def asPartitionColumns: Seq[String] = {
+ val (idTransforms, nonIdTransforms) = transforms.partition(_.isInstanceOf[IdentityTransform])
+
+ if (nonIdTransforms.nonEmpty) {
+ throw new AnalysisException("Transforms cannot be converted to partition columns: " +
+ nonIdTransforms.map(_.describe).mkString(", "))
+ }
+
+ idTransforms.map(_.asInstanceOf[IdentityTransform]).map(_.reference).map { ref =>
+ val parts = ref.fieldNames
+ if (parts.size > 1) {
+ throw new AnalysisException(s"Cannot partition by nested column: $ref")
+ } else {
+ parts(0)
+ }
+ }
+ }
+ }
+
+ implicit class CatalogHelper(plugin: CatalogPlugin) {
+ def asTableCatalog: TableCatalog = plugin match {
+ case tableCatalog: TableCatalog =>
+ tableCatalog
+ case _ =>
+ throw new AnalysisException(s"Cannot use catalog ${plugin.name}: not a TableCatalog")
+ }
+ }
+
+ implicit class NamespaceHelper(namespace: Array[String]) {
+ def quoted: String = namespace.map(quote).mkString(".")
+ }
+
+ implicit class IdentifierHelper(ident: Identifier) {
+ def quoted: String = {
+ if (ident.namespace.nonEmpty) {
+ ident.namespace.map(quote).mkString(".") + "." + quote(ident.name)
+ } else {
+ quote(ident.name)
+ }
+ }
+ }
+
+ implicit class MultipartIdentifierHelper(namespace: Seq[String]) {
+ def quoted: String = namespace.map(quote).mkString(".")
+ }
+
+ private def quote(part: String): String = {
+ if (part.contains(".") || part.contains("`")) {
+ s"`${part.replace("`", "``")}`"
+ } else {
+ part
+ }
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/expressions/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/expressions/expressions.scala
index 813d88255c6a2..2d4d6e7c6d5ee 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/expressions/expressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/expressions/expressions.scala
@@ -17,9 +17,7 @@
package org.apache.spark.sql.catalog.v2.expressions
-import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, IntegerType, StringType}
@@ -35,38 +33,6 @@ private[sql] object LogicalExpressions {
// because this is only used for field names, the SQL conf passed in does not matter.
private lazy val parser = new CatalystSqlParser(SQLConf.get)
- def fromPartitionColumns(columns: String*): Array[IdentityTransform] =
- columns.map(identity).toArray
-
- def fromBucketSpec(spec: BucketSpec): BucketTransform = {
- if (spec.sortColumnNames.nonEmpty) {
- throw new AnalysisException(
- s"Cannot convert bucketing with sort columns to a transform: $spec")
- }
-
- bucket(spec.numBuckets, spec.bucketColumnNames: _*)
- }
-
- implicit class TransformHelper(transforms: Seq[Transform]) {
- def asPartitionColumns: Seq[String] = {
- val (idTransforms, nonIdTransforms) = transforms.partition(_.isInstanceOf[IdentityTransform])
-
- if (nonIdTransforms.nonEmpty) {
- throw new AnalysisException("Transforms cannot be converted to partition columns: " +
- nonIdTransforms.map(_.describe).mkString(", "))
- }
-
- idTransforms.map(_.asInstanceOf[IdentityTransform]).map(_.reference).map { ref =>
- val parts = ref.fieldNames
- if (parts.size > 1) {
- throw new AnalysisException(s"Cannot partition by nested column: $ref")
- } else {
- parts(0)
- }
- }
- }
- }
-
def literal[T](value: T): LiteralValue[T] = {
val internalLit = catalyst.expressions.Literal(value)
literal(value, internalLit.dataType)
@@ -183,17 +149,10 @@ private[sql] final case class LiteralValue[T](value: T, dataType: DataType) exte
}
private[sql] final case class FieldReference(parts: Seq[String]) extends NamedReference {
+ import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.MultipartIdentifierHelper
override def fieldNames: Array[String] = parts.toArray
- override def describe: String = parts.map(quote).mkString(".")
+ override def describe: String = parts.quoted
override def toString: String = describe
-
- private def quote(part: String): String = {
- if (part.contains(".") || part.contains("`")) {
- s"`${part.replace("`", "``")}`"
- } else {
- part
- }
- }
}
private[sql] object FieldReference {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
index 6d587abd8fd4d..f5e9a146bf359 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
@@ -18,6 +18,8 @@
package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
+import org.apache.spark.sql.catalog.v2.Identifier
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
/**
@@ -25,13 +27,26 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
* as an [[org.apache.spark.sql.AnalysisException]] with the correct position information.
*/
class DatabaseAlreadyExistsException(db: String)
- extends AnalysisException(s"Database '$db' already exists")
+ extends NamespaceAlreadyExistsException(s"Database '$db' already exists")
-class TableAlreadyExistsException(db: String, table: String)
- extends AnalysisException(s"Table or view '$table' already exists in database '$db'")
+class NamespaceAlreadyExistsException(message: String) extends AnalysisException(message) {
+ def this(namespace: Array[String]) = {
+ this(s"Namespace '${namespace.quoted}' already exists")
+ }
+}
+
+class TableAlreadyExistsException(message: String) extends AnalysisException(message) {
+ def this(db: String, table: String) = {
+ this(s"Table or view '$table' already exists in database '$db'")
+ }
+
+ def this(tableIdent: Identifier) = {
+ this(s"Table ${tableIdent.quoted} already exists")
+ }
+}
class TempTableAlreadyExistsException(table: String)
- extends AnalysisException(s"Temporary view '$table' already exists")
+ extends TableAlreadyExistsException(s"Temporary view '$table' already exists")
class PartitionAlreadyExistsException(db: String, table: String, spec: TablePartitionSpec)
extends AnalysisException(
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
index 8bf6f69f3b17a..7ac8ae61ed537 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
@@ -18,6 +18,8 @@
package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
+import org.apache.spark.sql.catalog.v2.Identifier
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -25,10 +27,24 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
* Thrown by a catalog when an item cannot be found. The analyzer will rethrow the exception
* as an [[org.apache.spark.sql.AnalysisException]] with the correct position information.
*/
-class NoSuchDatabaseException(val db: String) extends AnalysisException(s"Database '$db' not found")
+class NoSuchDatabaseException(
+ val db: String) extends NoSuchNamespaceException(s"Database '$db' not found")
-class NoSuchTableException(db: String, table: String)
- extends AnalysisException(s"Table or view '$table' not found in database '$db'")
+class NoSuchNamespaceException(message: String) extends AnalysisException(message) {
+ def this(namespace: Array[String]) = {
+ this(s"Namespace '${namespace.quoted}' not found")
+ }
+}
+
+class NoSuchTableException(message: String) extends AnalysisException(message) {
+ def this(db: String, table: String) = {
+ this(s"Table or view '$table' not found in database '$db'")
+ }
+
+ def this(tableIdent: Identifier) = {
+ this(s"Table ${tableIdent.quoted} not found")
+ }
+}
class NoSuchPartitionException(
db: String,
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala
new file mode 100644
index 0000000000000..9c1b9a3e53de2
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala
@@ -0,0 +1,657 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2
+
+import java.util
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException}
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class TableCatalogSuite extends SparkFunSuite {
+ import CatalogV2Implicits._
+
+ private val emptyProps: util.Map[String, String] = Collections.emptyMap[String, String]
+ private val schema: StructType = new StructType()
+ .add("id", IntegerType)
+ .add("data", StringType)
+
+ private def newCatalog(): TableCatalog = {
+ val newCatalog = new TestTableCatalog
+ newCatalog.initialize("test", CaseInsensitiveStringMap.empty())
+ newCatalog
+ }
+
+ private val testIdent = Identifier.of(Array("`", "."), "test_table")
+
+ test("Catalogs can load the catalog") {
+ val catalog = newCatalog()
+
+ val conf = new SQLConf
+ conf.setConfString("spark.sql.catalog.test", catalog.getClass.getName)
+
+ val loaded = Catalogs.load("test", conf)
+ assert(loaded.getClass == catalog.getClass)
+ }
+
+ test("listTables") {
+ val catalog = newCatalog()
+ val ident1 = Identifier.of(Array("ns"), "test_table_1")
+ val ident2 = Identifier.of(Array("ns"), "test_table_2")
+ val ident3 = Identifier.of(Array("ns2"), "test_table_1")
+
+ assert(catalog.listTables(Array("ns")).isEmpty)
+
+ catalog.createTable(ident1, schema, Array.empty, emptyProps)
+
+ assert(catalog.listTables(Array("ns")).toSet == Set(ident1))
+ assert(catalog.listTables(Array("ns2")).isEmpty)
+
+ catalog.createTable(ident3, schema, Array.empty, emptyProps)
+ catalog.createTable(ident2, schema, Array.empty, emptyProps)
+
+ assert(catalog.listTables(Array("ns")).toSet == Set(ident1, ident2))
+ assert(catalog.listTables(Array("ns2")).toSet == Set(ident3))
+
+ catalog.dropTable(ident1)
+
+ assert(catalog.listTables(Array("ns")).toSet == Set(ident2))
+
+ catalog.dropTable(ident2)
+
+ assert(catalog.listTables(Array("ns")).isEmpty)
+ assert(catalog.listTables(Array("ns2")).toSet == Set(ident3))
+ }
+
+ test("createTable") {
+ val catalog = newCatalog()
+
+ assert(!catalog.tableExists(testIdent))
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
+ assert(parsed == Seq("`", ".", "test_table"))
+ assert(table.schema == schema)
+ assert(table.properties.asScala == Map())
+
+ assert(catalog.tableExists(testIdent))
+ }
+
+ test("createTable: with properties") {
+ val catalog = newCatalog()
+
+ val properties = new util.HashMap[String, String]()
+ properties.put("property", "value")
+
+ assert(!catalog.tableExists(testIdent))
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, properties)
+
+ val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
+ assert(parsed == Seq("`", ".", "test_table"))
+ assert(table.schema == schema)
+ assert(table.properties == properties)
+
+ assert(catalog.tableExists(testIdent))
+ }
+
+ test("createTable: table already exists") {
+ val catalog = newCatalog()
+
+ assert(!catalog.tableExists(testIdent))
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ val exc = intercept[TableAlreadyExistsException] {
+ catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ }
+
+ assert(exc.message.contains(table.name()))
+ assert(exc.message.contains("already exists"))
+
+ assert(catalog.tableExists(testIdent))
+ }
+
+ test("tableExists") {
+ val catalog = newCatalog()
+
+ assert(!catalog.tableExists(testIdent))
+
+ catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(catalog.tableExists(testIdent))
+
+ catalog.dropTable(testIdent)
+
+ assert(!catalog.tableExists(testIdent))
+ }
+
+ test("loadTable") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val loaded = catalog.loadTable(testIdent)
+
+ assert(table.name == loaded.name)
+ assert(table.schema == loaded.schema)
+ assert(table.properties == loaded.properties)
+ }
+
+ test("loadTable: table does not exist") {
+ val catalog = newCatalog()
+
+ val exc = intercept[NoSuchTableException] {
+ catalog.loadTable(testIdent)
+ }
+
+ assert(exc.message.contains(testIdent.quoted))
+ assert(exc.message.contains("not found"))
+ }
+
+ test("invalidateTable") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ catalog.invalidateTable(testIdent)
+
+ val loaded = catalog.loadTable(testIdent)
+
+ assert(table.name == loaded.name)
+ assert(table.schema == loaded.schema)
+ assert(table.properties == loaded.properties)
+ }
+
+ test("invalidateTable: table does not exist") {
+ val catalog = newCatalog()
+
+ assert(catalog.tableExists(testIdent) === false)
+
+ catalog.invalidateTable(testIdent)
+ }
+
+ test("alterTable: add property") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.properties.asScala == Map())
+
+ val updated = catalog.alterTable(testIdent, TableChange.setProperty("prop-1", "1"))
+ assert(updated.properties.asScala == Map("prop-1" -> "1"))
+
+ val loaded = catalog.loadTable(testIdent)
+ assert(loaded.properties.asScala == Map("prop-1" -> "1"))
+
+ assert(table.properties.asScala == Map())
+ }
+
+ test("alterTable: add property to existing") {
+ val catalog = newCatalog()
+
+ val properties = new util.HashMap[String, String]()
+ properties.put("prop-1", "1")
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, properties)
+
+ assert(table.properties.asScala == Map("prop-1" -> "1"))
+
+ val updated = catalog.alterTable(testIdent, TableChange.setProperty("prop-2", "2"))
+ assert(updated.properties.asScala == Map("prop-1" -> "1", "prop-2" -> "2"))
+
+ val loaded = catalog.loadTable(testIdent)
+ assert(loaded.properties.asScala == Map("prop-1" -> "1", "prop-2" -> "2"))
+
+ assert(table.properties.asScala == Map("prop-1" -> "1"))
+ }
+
+ test("alterTable: remove existing property") {
+ val catalog = newCatalog()
+
+ val properties = new util.HashMap[String, String]()
+ properties.put("prop-1", "1")
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, properties)
+
+ assert(table.properties.asScala == Map("prop-1" -> "1"))
+
+ val updated = catalog.alterTable(testIdent, TableChange.removeProperty("prop-1"))
+ assert(updated.properties.asScala == Map())
+
+ val loaded = catalog.loadTable(testIdent)
+ assert(loaded.properties.asScala == Map())
+
+ assert(table.properties.asScala == Map("prop-1" -> "1"))
+ }
+
+ test("alterTable: remove missing property") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.properties.asScala == Map())
+
+ val updated = catalog.alterTable(testIdent, TableChange.removeProperty("prop-1"))
+ assert(updated.properties.asScala == Map())
+
+ val loaded = catalog.loadTable(testIdent)
+ assert(loaded.properties.asScala == Map())
+
+ assert(table.properties.asScala == Map())
+ }
+
+ test("alterTable: add top-level column") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.schema == schema)
+
+ val updated = catalog.alterTable(testIdent, TableChange.addColumn(Array("ts"), TimestampType))
+
+ assert(updated.schema == schema.add("ts", TimestampType))
+ }
+
+ test("alterTable: add required column") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.schema == schema)
+
+ val updated = catalog.alterTable(testIdent,
+ TableChange.addColumn(Array("ts"), TimestampType, false))
+
+ assert(updated.schema == schema.add("ts", TimestampType, nullable = false))
+ }
+
+ test("alterTable: add column with comment") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.schema == schema)
+
+ val updated = catalog.alterTable(testIdent,
+ TableChange.addColumn(Array("ts"), TimestampType, false, "comment text"))
+
+ val field = StructField("ts", TimestampType, nullable = false).withComment("comment text")
+ assert(updated.schema == schema.add(field))
+ }
+
+ test("alterTable: add nested column") {
+ val catalog = newCatalog()
+
+ val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
+ val tableSchema = schema.add("point", pointStruct)
+
+ val table = catalog.createTable(testIdent, tableSchema, Array.empty, emptyProps)
+
+ assert(table.schema == tableSchema)
+
+ val updated = catalog.alterTable(testIdent,
+ TableChange.addColumn(Array("point", "z"), DoubleType))
+
+ val expectedSchema = schema.add("point", pointStruct.add("z", DoubleType))
+
+ assert(updated.schema == expectedSchema)
+ }
+
+ test("alterTable: add column to primitive field fails") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.schema == schema)
+
+ val exc = intercept[IllegalArgumentException] {
+ catalog.alterTable(testIdent, TableChange.addColumn(Array("data", "ts"), TimestampType))
+ }
+
+ assert(exc.getMessage.contains("Not a struct"))
+ assert(exc.getMessage.contains("data"))
+
+ // the table has not changed
+ assert(catalog.loadTable(testIdent).schema == schema)
+ }
+
+ test("alterTable: add field to missing column fails") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.schema == schema)
+
+ val exc = intercept[IllegalArgumentException] {
+ catalog.alterTable(testIdent,
+ TableChange.addColumn(Array("missing_col", "new_field"), StringType))
+ }
+
+ assert(exc.getMessage.contains("missing_col"))
+ assert(exc.getMessage.contains("Cannot find"))
+ }
+
+ test("alterTable: update column data type") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.schema == schema)
+
+ val updated = catalog.alterTable(testIdent, TableChange.updateColumnType(Array("id"), LongType))
+
+ val expectedSchema = new StructType().add("id", LongType).add("data", StringType)
+ assert(updated.schema == expectedSchema)
+ }
+
+ test("alterTable: update column data type and nullability") {
+ val catalog = newCatalog()
+
+ val originalSchema = new StructType()
+ .add("id", IntegerType, nullable = false)
+ .add("data", StringType)
+ val table = catalog.createTable(testIdent, originalSchema, Array.empty, emptyProps)
+
+ assert(table.schema == originalSchema)
+
+ val updated = catalog.alterTable(testIdent,
+ TableChange.updateColumnType(Array("id"), LongType, true))
+
+ val expectedSchema = new StructType().add("id", LongType).add("data", StringType)
+ assert(updated.schema == expectedSchema)
+ }
+
+ test("alterTable: update optional column to required fails") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.schema == schema)
+
+ val exc = intercept[IllegalArgumentException] {
+ catalog.alterTable(testIdent, TableChange.updateColumnType(Array("id"), LongType, false))
+ }
+
+ assert(exc.getMessage.contains("Cannot change optional column to required"))
+ assert(exc.getMessage.contains("id"))
+ }
+
+ test("alterTable: update missing column fails") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.schema == schema)
+
+ val exc = intercept[IllegalArgumentException] {
+ catalog.alterTable(testIdent,
+ TableChange.updateColumnType(Array("missing_col"), LongType))
+ }
+
+ assert(exc.getMessage.contains("missing_col"))
+ assert(exc.getMessage.contains("Cannot find"))
+ }
+
+ test("alterTable: add comment") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.schema == schema)
+
+ val updated = catalog.alterTable(testIdent,
+ TableChange.updateColumnComment(Array("id"), "comment text"))
+
+ val expectedSchema = new StructType()
+ .add("id", IntegerType, nullable = true, "comment text")
+ .add("data", StringType)
+ assert(updated.schema == expectedSchema)
+ }
+
+ test("alterTable: replace comment") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.schema == schema)
+
+ catalog.alterTable(testIdent, TableChange.updateColumnComment(Array("id"), "comment text"))
+
+ val expectedSchema = new StructType()
+ .add("id", IntegerType, nullable = true, "replacement comment")
+ .add("data", StringType)
+
+ val updated = catalog.alterTable(testIdent,
+ TableChange.updateColumnComment(Array("id"), "replacement comment"))
+
+ assert(updated.schema == expectedSchema)
+ }
+
+ test("alterTable: add comment to missing column fails") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.schema == schema)
+
+ val exc = intercept[IllegalArgumentException] {
+ catalog.alterTable(testIdent,
+ TableChange.updateColumnComment(Array("missing_col"), "comment"))
+ }
+
+ assert(exc.getMessage.contains("missing_col"))
+ assert(exc.getMessage.contains("Cannot find"))
+ }
+
+ test("alterTable: rename top-level column") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.schema == schema)
+
+ val updated = catalog.alterTable(testIdent, TableChange.renameColumn(Array("id"), "some_id"))
+
+ val expectedSchema = new StructType().add("some_id", IntegerType).add("data", StringType)
+
+ assert(updated.schema == expectedSchema)
+ }
+
+ test("alterTable: rename nested column") {
+ val catalog = newCatalog()
+
+ val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
+ val tableSchema = schema.add("point", pointStruct)
+
+ val table = catalog.createTable(testIdent, tableSchema, Array.empty, emptyProps)
+
+ assert(table.schema == tableSchema)
+
+ val updated = catalog.alterTable(testIdent,
+ TableChange.renameColumn(Array("point", "x"), "first"))
+
+ val newPointStruct = new StructType().add("first", DoubleType).add("y", DoubleType)
+ val expectedSchema = schema.add("point", newPointStruct)
+
+ assert(updated.schema == expectedSchema)
+ }
+
+ test("alterTable: rename struct column") {
+ val catalog = newCatalog()
+
+ val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
+ val tableSchema = schema.add("point", pointStruct)
+
+ val table = catalog.createTable(testIdent, tableSchema, Array.empty, emptyProps)
+
+ assert(table.schema == tableSchema)
+
+ val updated = catalog.alterTable(testIdent,
+ TableChange.renameColumn(Array("point"), "p"))
+
+ val newPointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
+ val expectedSchema = schema.add("p", newPointStruct)
+
+ assert(updated.schema == expectedSchema)
+ }
+
+ test("alterTable: rename missing column fails") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.schema == schema)
+
+ val exc = intercept[IllegalArgumentException] {
+ catalog.alterTable(testIdent,
+ TableChange.renameColumn(Array("missing_col"), "new_name"))
+ }
+
+ assert(exc.getMessage.contains("missing_col"))
+ assert(exc.getMessage.contains("Cannot find"))
+ }
+
+ test("alterTable: multiple changes") {
+ val catalog = newCatalog()
+
+ val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
+ val tableSchema = schema.add("point", pointStruct)
+
+ val table = catalog.createTable(testIdent, tableSchema, Array.empty, emptyProps)
+
+ assert(table.schema == tableSchema)
+
+ val updated = catalog.alterTable(testIdent,
+ TableChange.renameColumn(Array("point", "x"), "first"),
+ TableChange.renameColumn(Array("point", "y"), "second"))
+
+ val newPointStruct = new StructType().add("first", DoubleType).add("second", DoubleType)
+ val expectedSchema = schema.add("point", newPointStruct)
+
+ assert(updated.schema == expectedSchema)
+ }
+
+ test("alterTable: delete top-level column") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.schema == schema)
+
+ val updated = catalog.alterTable(testIdent,
+ TableChange.deleteColumn(Array("id")))
+
+ val expectedSchema = new StructType().add("data", StringType)
+ assert(updated.schema == expectedSchema)
+ }
+
+ test("alterTable: delete nested column") {
+ val catalog = newCatalog()
+
+ val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
+ val tableSchema = schema.add("point", pointStruct)
+
+ val table = catalog.createTable(testIdent, tableSchema, Array.empty, emptyProps)
+
+ assert(table.schema == tableSchema)
+
+ val updated = catalog.alterTable(testIdent,
+ TableChange.deleteColumn(Array("point", "y")))
+
+ val newPointStruct = new StructType().add("x", DoubleType)
+ val expectedSchema = schema.add("point", newPointStruct)
+
+ assert(updated.schema == expectedSchema)
+ }
+
+ test("alterTable: delete missing column fails") {
+ val catalog = newCatalog()
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(table.schema == schema)
+
+ val exc = intercept[IllegalArgumentException] {
+ catalog.alterTable(testIdent, TableChange.deleteColumn(Array("missing_col")))
+ }
+
+ assert(exc.getMessage.contains("missing_col"))
+ assert(exc.getMessage.contains("Cannot find"))
+ }
+
+ test("alterTable: delete missing nested column fails") {
+ val catalog = newCatalog()
+
+ val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
+ val tableSchema = schema.add("point", pointStruct)
+
+ val table = catalog.createTable(testIdent, tableSchema, Array.empty, emptyProps)
+
+ assert(table.schema == tableSchema)
+
+ val exc = intercept[IllegalArgumentException] {
+ catalog.alterTable(testIdent, TableChange.deleteColumn(Array("point", "z")))
+ }
+
+ assert(exc.getMessage.contains("z"))
+ assert(exc.getMessage.contains("Cannot find"))
+ }
+
+ test("alterTable: table does not exist") {
+ val catalog = newCatalog()
+
+ val exc = intercept[NoSuchTableException] {
+ catalog.alterTable(testIdent, TableChange.setProperty("prop", "val"))
+ }
+
+ assert(exc.message.contains(testIdent.quoted))
+ assert(exc.message.contains("not found"))
+ }
+
+ test("dropTable") {
+ val catalog = newCatalog()
+
+ assert(!catalog.tableExists(testIdent))
+
+ catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(catalog.tableExists(testIdent))
+
+ val wasDropped = catalog.dropTable(testIdent)
+
+ assert(wasDropped)
+ assert(!catalog.tableExists(testIdent))
+ }
+
+ test("dropTable: table does not exist") {
+ val catalog = newCatalog()
+
+ assert(!catalog.tableExists(testIdent))
+
+ val wasDropped = catalog.dropTable(testIdent)
+
+ assert(!wasDropped)
+ assert(!catalog.tableExists(testIdent))
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala
new file mode 100644
index 0000000000000..7a0b014a85462
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2
+
+import java.util
+import java.util.Collections
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, DeleteColumn, RemoveProperty, RenameColumn, SetProperty, UpdateColumnComment, UpdateColumnType}
+import org.apache.spark.sql.catalog.v2.expressions.Transform
+import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException}
+import org.apache.spark.sql.sources.v2.{Table, TableCapability}
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class TestTableCatalog extends TableCatalog {
+ import CatalogV2Implicits._
+
+ private val tables: util.Map[Identifier, Table] = new ConcurrentHashMap[Identifier, Table]()
+ private var _name: Option[String] = None
+
+ override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
+ _name = Some(name)
+ }
+
+ override def name: String = _name.get
+
+ override def listTables(namespace: Array[String]): Array[Identifier] = {
+ tables.keySet.asScala.filter(_.namespace.sameElements(namespace)).toArray
+ }
+
+ override def loadTable(ident: Identifier): Table = {
+ Option(tables.get(ident)) match {
+ case Some(table) =>
+ table
+ case _ =>
+ throw new NoSuchTableException(ident)
+ }
+ }
+
+ override def createTable(
+ ident: Identifier,
+ schema: StructType,
+ partitions: Array[Transform],
+ properties: util.Map[String, String]): Table = {
+
+ if (tables.containsKey(ident)) {
+ throw new TableAlreadyExistsException(ident)
+ }
+
+ if (partitions.nonEmpty) {
+ throw new UnsupportedOperationException(
+ s"Catalog $name: Partitioned tables are not supported")
+ }
+
+ val table = InMemoryTable(ident.quoted, schema, properties)
+
+ tables.put(ident, table)
+
+ table
+ }
+
+ override def alterTable(ident: Identifier, changes: TableChange*): Table = {
+ val table = loadTable(ident)
+ val properties = TestTableCatalog.applyPropertiesChanges(table.properties, changes)
+ val schema = TestTableCatalog.applySchemaChanges(table.schema, changes)
+ val newTable = InMemoryTable(table.name, schema, properties)
+
+ tables.put(ident, newTable)
+
+ newTable
+ }
+
+ override def dropTable(ident: Identifier): Boolean = Option(tables.remove(ident)).isDefined
+}
+
+private object TestTableCatalog {
+ /**
+ * Apply properties changes to a map and return the result.
+ */
+ def applyPropertiesChanges(
+ properties: util.Map[String, String],
+ changes: Seq[TableChange]): util.Map[String, String] = {
+ val newProperties = new util.HashMap[String, String](properties)
+
+ changes.foreach {
+ case set: SetProperty =>
+ newProperties.put(set.property, set.value)
+
+ case unset: RemoveProperty =>
+ newProperties.remove(unset.property)
+
+ case _ =>
+ // ignore non-property changes
+ }
+
+ Collections.unmodifiableMap(newProperties)
+ }
+
+ /**
+ * Apply schema changes to a schema and return the result.
+ */
+ def applySchemaChanges(schema: StructType, changes: Seq[TableChange]): StructType = {
+ changes.foldLeft(schema) { (schema, change) =>
+ change match {
+ case add: AddColumn =>
+ add.fieldNames match {
+ case Array(name) =>
+ val newField = StructField(name, add.dataType, nullable = add.isNullable)
+ Option(add.comment) match {
+ case Some(comment) =>
+ schema.add(newField.withComment(comment))
+ case _ =>
+ schema.add(newField)
+ }
+
+ case names =>
+ replace(schema, names.init, parent => parent.dataType match {
+ case parentType: StructType =>
+ val field = StructField(names.last, add.dataType, nullable = add.isNullable)
+ val newParentType = Option(add.comment) match {
+ case Some(comment) =>
+ parentType.add(field.withComment(comment))
+ case None =>
+ parentType.add(field)
+ }
+
+ Some(StructField(parent.name, newParentType, parent.nullable, parent.metadata))
+
+ case _ =>
+ throw new IllegalArgumentException(s"Not a struct: ${names.init.last}")
+ })
+ }
+
+ case rename: RenameColumn =>
+ replace(schema, rename.fieldNames, field =>
+ Some(StructField(rename.newName, field.dataType, field.nullable, field.metadata)))
+
+ case update: UpdateColumnType =>
+ replace(schema, update.fieldNames, field => {
+ if (!update.isNullable && field.nullable) {
+ throw new IllegalArgumentException(
+ s"Cannot change optional column to required: $field.name")
+ }
+ Some(StructField(field.name, update.newDataType, update.isNullable, field.metadata))
+ })
+
+ case update: UpdateColumnComment =>
+ replace(schema, update.fieldNames, field =>
+ Some(field.withComment(update.newComment)))
+
+ case delete: DeleteColumn =>
+ replace(schema, delete.fieldNames, _ => None)
+
+ case _ =>
+ // ignore non-schema changes
+ schema
+ }
+ }
+ }
+
+ private def replace(
+ struct: StructType,
+ path: Seq[String],
+ update: StructField => Option[StructField]): StructType = {
+
+ val pos = struct.getFieldIndex(path.head)
+ .getOrElse(throw new IllegalArgumentException(s"Cannot find field: ${path.head}"))
+ val field = struct.fields(pos)
+ val replacement: Option[StructField] = if (path.tail.isEmpty) {
+ update(field)
+ } else {
+ field.dataType match {
+ case nestedStruct: StructType =>
+ val updatedType: StructType = replace(nestedStruct, path.tail, update)
+ Some(StructField(field.name, updatedType, field.nullable, field.metadata))
+ case _ =>
+ throw new IllegalArgumentException(s"Not a struct: ${path.head}")
+ }
+ }
+
+ val newFields = struct.fields.zipWithIndex.flatMap {
+ case (_, index) if pos == index =>
+ replacement
+ case (other, _) =>
+ Some(other)
+ }
+
+ new StructType(newFields)
+ }
+}
+
+case class InMemoryTable(
+ name: String,
+ schema: StructType,
+ override val properties: util.Map[String, String]) extends Table {
+ override def partitioning: Array[Transform] = Array.empty
+ override def capabilities: util.Set[TableCapability] = InMemoryTable.CAPABILITIES
+}
+
+object InMemoryTable {
+ val CAPABILITIES: util.Set[TableCapability] = Set.empty[TableCapability].asJava
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
index f503ff03b971c..30ecad642dc16 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.sources.v2.TableProvider
import org.apache.spark.sql.types.StructType
case class DataSourceResolution(conf: SQLConf) extends Rule[LogicalPlan] with CastSupport {
- import org.apache.spark.sql.catalog.v2.expressions.LogicalExpressions.TransformHelper
+ import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.TransformHelper
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case CreateTableStatement(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
index c0c57b862055f..3b0cde5efbbd4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
@@ -16,11 +16,14 @@
*/
package org.apache.spark.sql.execution.datasources.v2
+import java.util
+
import scala.collection.JavaConverters._
import org.apache.hadoop.fs.FileStatus
import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.v2.{SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.sources.v2.TableCapability._
@@ -35,6 +38,8 @@ abstract class FileTable(
userSpecifiedSchema: Option[StructType])
extends Table with SupportsRead with SupportsWrite {
+ import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
+
lazy val fileIndex: PartitioningAwareFileIndex = {
val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
// Hadoop Configurations are case sensitive.
@@ -82,7 +87,11 @@ abstract class FileTable(
StructType(fields)
}
- override def capabilities(): java.util.Set[TableCapability] = FileTable.CAPABILITIES
+ override def partitioning: Array[Transform] = fileIndex.partitionSchema.asTransforms
+
+ override def properties: util.Map[String, String] = options.asCaseSensitiveMap
+
+ override def capabilities: java.util.Set[TableCapability] = FileTable.CAPABILITIES
/**
* When possible, this method should return the schema of the given `files`. When the format
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
index dfbea927e477b..391af5a306a16 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
@@ -20,6 +20,8 @@
import java.io.IOException;
import java.util.Arrays;
+import org.apache.spark.sql.catalog.v2.expressions.Expressions;
+import org.apache.spark.sql.catalog.v2.expressions.Transform;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.sources.v2.Table;
@@ -56,6 +58,11 @@ public Partitioning outputPartitioning() {
@Override
public Table getTable(CaseInsensitiveStringMap options) {
return new JavaSimpleBatchTable() {
+ @Override
+ public Transform[] partitioning() {
+ return new Transform[] { Expressions.identity("i") };
+ }
+
@Override
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
return new MyScanBuilder();
+ * import TableChange._
+ * val catalog = Catalogs.load(name)
+ * catalog.asTableCatalog.alterTable(ident,
+ * addColumn("x", IntegerType),
+ * renameColumn("a", "b"),
+ * deleteColumn("c")
+ * )
+ *
+ */
+public interface TableChange {
+
+ /**
+ * Create a TableChange for setting a table property.
+ *