From c8d58462154ae38b56b8ec6859940943d937cc29 Mon Sep 17 00:00:00 2001 From: stczwd Date: Fri, 22 May 2020 16:00:54 +0800 Subject: [PATCH 01/23] add supports partitions catalog api Change-Id: If8ae497644895167fd0f75de863411c3c37e2662 --- .../connector/catalog/SupportsPartitions.java | 107 ++++++++++++++++++ .../sql/connector/catalog/TablePartition.java | 54 +++++++++ 2 files changed, 161 insertions(+) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TablePartition.java diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java new file mode 100644 index 0000000000000..8aa8442720f7a --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connector.catalog; + +import java.util.Map; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; + +/** + * Catalog methods for working with Partitions. + */ +@Experimental +public interface SupportsPartitions extends TableCatalog { + + /** + * Create partitions in an existing table, assuming it exists. + * + * @param ident a table identifier + * @param partitions transforms to use for partitioning data in the table + * @param ignoreIfExists + */ + void createPartitions( + Identifier ident, + TablePartition[] partitions, + Boolean ignoreIfExists); + + /** + * Drop partitions from a table, assuming they exist. + * + * @param ident a table identifier + * @param partitions a list of string map for existing partitions + * @param ignoreIfNotExists + */ + void dropPartitions( + Identifier ident, + Map[] partitions, + Boolean ignoreIfNotExists); + + /** + * Override the specs of one or many existing table partitions, assuming they exist. + * + * @param ident a table identifier + * @param oldpartitions a list of string map for existing partitions to be renamed + * @param newPartitions a list of string map for new partitions + */ + void renamePartitions( + Identifier ident, + Map[] oldpartitions, + Map[] newPartitions); + + /** + * Alter one or many table partitions whose specs that match those specified in `parts`, + * assuming the partitions exist. + * + * @param ident a table identifier + * @param partitions transforms to use for partitioning data in the table + */ + void alterPartitions( + Identifier ident, + TablePartition[] partitions); + + /** + * Retrieve the metadata of a table partition, assuming it exists. + * + * @param ident a table identifier + * @param partition a list of string map for existing partitions + */ + TablePartition getPartition( + Identifier ident, + Map partition); + + /** + * List the names of all partitions that belong to the specified table, assuming it exists. + * + * @param ident a table identifier + * @param partition a list of string map for existing partitions + */ + String[] listPartitionNames( + Identifier ident, + Map partition); + + /** + * List the metadata of all partitions that belong to the specified table, assuming it exists. + * + * @param ident a table identifier + * @param partition a list of string map for existing partitions + */ + TablePartition[] listPartitions( + Identifier ident, + Map partition); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TablePartition.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TablePartition.java new file mode 100644 index 0000000000000..23c3295433a9e --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TablePartition.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connector.catalog; + +import java.util.HashMap; +import java.util.Map; + +public class TablePartition { + private Map partitionSpec; + private Map parametes; + + public TablePartition( + Map partitionSpec) { + this.partitionSpec = partitionSpec; + this.parametes = new HashMap(); + } + + public TablePartition( + Map partitionSpec, + Map parametes) { + this.partitionSpec = partitionSpec; + this.parametes = parametes; + } + + public Map getPartitionSpec() { + return partitionSpec; + } + + public void setPartitionSpec(Map partitionSpec) { + this.partitionSpec = partitionSpec; + } + + public Map getParametes() { + return parametes; + } + + public void setParameters(Map parametes) { + this.parametes = parametes; + } +} From 7a83f3091e52906dfc5c8244965ead2c85ba1492 Mon Sep 17 00:00:00 2001 From: stczwd Date: Sat, 23 May 2020 10:07:06 +0800 Subject: [PATCH 02/23] add partition catalog api test Change-Id: I56cd7d5f02b4fe9018a25bcc901bc23e6acaaed4 --- .../connector/catalog/SupportsPartitions.java | 2 - .../connector/InMemoryPartitionCatalog.scala | 176 ++++++++++++++++++ .../catalog/SupportsPartitionsSuite.scala | 174 +++++++++++++++++ 3 files changed, 350 insertions(+), 2 deletions(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionCatalog.scala create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionsSuite.scala diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java index 8aa8442720f7a..b548d873d3063 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java @@ -19,8 +19,6 @@ import java.util.Map; import org.apache.spark.annotation.Experimental; -import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; -import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; /** * Catalog methods for working with Partitions. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionCatalog.scala new file mode 100644 index 0000000000000..2324d49003eb8 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionCatalog.scala @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +import java.{lang, util} +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException +import org.apache.spark.sql.connector.catalog.{Identifier, SupportsPartitions, TablePartition} + + +/** + * This class is used to test SupportsPartitions API. + */ +class InMemoryPartitionCatalog extends InMemoryTableCatalog with SupportsPartitions { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + protected val memoryTablePartitions: util.Map[Identifier, mutable.HashSet[TablePartition]] = + new ConcurrentHashMap[Identifier, mutable.HashSet[TablePartition]]() + + def createPartitions( + ident: Identifier, + partitions: Array[TablePartition], + ignoreIfExists: lang.Boolean = false): Unit = { + assert(tableExists(ident)) + val table = loadTable(ident).asInstanceOf[InMemoryTable] + val partitionKeys = table.partitioning.map(_.references().head.fieldNames().head) + checkPartitionKeysExists(partitionKeys, partitions.map(_.getPartitionSpec)) + val newPartitions = partitions.map { partition => + val partitionSpec = partition.getPartitionSpec + partitionKeys.filterNot(partitionSpec.keySet().contains) + .foreach { partitionKey => partitionSpec.put(partitionKey, "")} + partition.setPartitionSpec(partitionSpec) + partition + } + val tablePartitions = + memoryTablePartitions.getOrDefault(ident, new mutable.HashSet[TablePartition]()) + newPartitions.foreach(tablePartitions.add) + memoryTablePartitions.put(ident, tablePartitions) + } + + def dropPartitions( + ident: Identifier, + partitions: Array[util.Map[String, String]], + ignoreIfNotExists: lang.Boolean = false, + purge: lang.Boolean = false, + retainData: lang.Boolean = false): Unit = { + assert(tableExists(ident)) + val table = loadTable(ident).asInstanceOf[InMemoryTable] + val partitionKeys = table.partitioning.map(_.references().head.fieldNames().head) + checkPartitionKeysExists(partitionKeys, partitions) + if (memoryTablePartitions.containsKey(ident)) { + val tablePartitions = memoryTablePartitions.get(ident) + tablePartitions.filter { tablePartition => + partitions.exists { partition => + partition.asScala.toSet.subsetOf(tablePartition.getPartitionSpec.asScala.toSet) + } + }.foreach(tablePartitions.remove) + memoryTablePartitions.put(ident, tablePartitions) + } + } + + def renamePartitions( + ident: Identifier, + oldpartitions: Array[util.Map[String, String]], + newPartitions: Array[util.Map[String, String]]): Unit = { + assert(tableExists(ident)) + val table = loadTable(ident).asInstanceOf[InMemoryTable] + val partitionKeys = table.partitioning.map(_.references().head.fieldNames().head) + checkPartitionKeysExists(partitionKeys, oldpartitions) + checkPartitionKeysExists(partitionKeys, newPartitions) + if (memoryTablePartitions.containsKey(ident)) { + val tablePartitions = memoryTablePartitions.get(ident) + for (index <- oldpartitions.indices) { + tablePartitions.filter { tablePartition => + oldpartitions(index).asScala.toSet.subsetOf(tablePartition.getPartitionSpec.asScala.toSet) + }.foreach { tablePartition => + val newPartitionSpec = new util.HashMap[String, String](tablePartition.getPartitionSpec) + newPartitionSpec.putAll(newPartitions(index)) + tablePartition.setPartitionSpec(newPartitionSpec) + } + } + memoryTablePartitions.put(ident, tablePartitions) + } + } + + def alterPartitions( + ident: Identifier, + partitions: Array[TablePartition]): Unit = { + assert(tableExists(ident)) + assert(tableExists(ident)) + val table = loadTable(ident).asInstanceOf[InMemoryTable] + val partitionKeys = table.partitioning.map(_.references().head.fieldNames().head) + checkPartitionKeysExists(partitionKeys, partitions.map(_.getPartitionSpec)) + if (memoryTablePartitions.containsKey(ident)) { + val tablePartitions = memoryTablePartitions.get(ident) + partitions.foreach { partition => + tablePartitions.filter(_.getPartitionSpec == partition.getPartitionSpec) + .foreach(tablePartitions.remove) + tablePartitions.add(partition) + } + } + } + + def getPartition( + ident: Identifier, + partition: util.Map[String, String]): TablePartition = { + assert(tableExists(ident)) + val table = loadTable(ident).asInstanceOf[InMemoryTable] + val partitionKeys = table.partitioning.map(_.references().head.fieldNames().head) + checkPartitionKeysExists(partitionKeys, Array(partition)) + memoryTablePartitions.getOrDefault(ident, mutable.HashSet.empty[TablePartition]) + .find(_.getPartitionSpec == partition) + .getOrElse { + throw new NoSuchPartitionException( + ident.namespace().quoted, + ident.name(), + partition.asScala.toMap) + } + } + + def listPartitionNames( + ident: Identifier, + partition: util.Map[String, String] = new util.HashMap()): Array[String] = { + assert(tableExists(ident)) + memoryTablePartitions.getOrDefault(ident, mutable.HashSet.empty[TablePartition]) + .filter { tablePartition => + partition.asScala.toSet.subsetOf(tablePartition.getPartitionSpec.asScala.toSet) + }.map { tablePartition => + tablePartition.getPartitionSpec.asScala.map { kv => + s"${kv._1}=${kv._2}" + }.mkString("/") + }.toArray + } + + def listPartitions( + ident: Identifier, + partition: util.Map[String, String] = new util.HashMap()): Array[TablePartition] = { + assert(tableExists(ident)) + memoryTablePartitions.getOrDefault(ident, mutable.HashSet.empty[TablePartition]) + .filter { tablePartition => + partition.asScala.toSet.subsetOf(tablePartition.getPartitionSpec.asScala.toSet) + }.toArray + } + + private def checkPartitionKeysExists( + partitionKeys: Array[String], + partitions: Array[util.Map[String, String]]): Unit = { + partitions.foreach { partition => + val errorPartitionKeys = partition.keySet().asScala.filterNot(partitionKeys.contains) + if (errorPartitionKeys.nonEmpty) { + throw new IllegalArgumentException( + s"Partition Keys not exists, table partitions: ${partitionKeys.mkString("{", ",", "}")}") + } + } + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionsSuite.scala new file mode 100644 index 0000000000000..53a1b25c54336 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionsSuite.scala @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException +import org.apache.spark.sql.connector.InMemoryPartitionCatalog +import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference} +import org.apache.spark.sql.types.{IntegerType, StringType, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class SupportsPartitionsSuite extends SparkFunSuite { + + private val ident: Identifier = Identifier.of(Array("ns"), "test_table") + def ref(name: String): NamedReference = LogicalExpressions.parseReference(name) + + private val catalog: InMemoryPartitionCatalog = { + val newCatalog = new InMemoryPartitionCatalog + newCatalog.initialize("test", CaseInsensitiveStringMap.empty()) + newCatalog.createTable( + ident, + new StructType() + .add("id", IntegerType) + .add("data", StringType) + .add("dt", StringType), + Array(LogicalExpressions.identity(ref("dt"))), + util.Collections.emptyMap[String, String]) + newCatalog + } + + test("createPartitions") { + assert(catalog.listPartitionNames(ident).isEmpty) + val part = new TablePartition(Map("dt" -> "3").asJava, new util.HashMap[String, String]()) + catalog.createPartitions(ident, Array(part)) + assert(catalog.listPartitionNames(ident).nonEmpty) + val partition = catalog.getPartition(ident, Map("dt" -> "3").asJava) + assert(partition.getPartitionSpec.get("dt") == "3") + catalog.dropPartitions(ident, Array(Map("dt" -> "3").asJava)) + assert(catalog.listPartitionNames(ident).isEmpty) + } + + test("dropPartitions") { + assert(catalog.listPartitionNames(ident).isEmpty) + val part = new TablePartition(Map("dt" -> "3").asJava, new util.HashMap[String, String]()) + val part1 = new TablePartition(Map("dt" -> "4").asJava, new util.HashMap[String, String]()) + catalog.createPartitions(ident, Array(part)) + catalog.createPartitions(ident, Array(part1)) + assert(catalog.listPartitionNames(ident).length == 2) + catalog.dropPartitions(ident, Array(Map("dt" -> "3").asJava)) + assert(catalog.listPartitionNames(ident).length == 1) + catalog.dropPartitions(ident, Array(Map("dt" -> "4").asJava)) + assert(catalog.listPartitionNames(ident).isEmpty) + } + + test("renamePartition") { + assert(catalog.listPartitionNames(ident).isEmpty) + val part = new TablePartition(Map("dt" -> "3").asJava, new util.HashMap[String, String]()) + catalog.createPartitions(ident, Array(part)) + assert(catalog.listPartitionNames(ident).nonEmpty) + val partition = catalog.getPartition(ident, Map("dt" -> "3").asJava) + assert(partition.getPartitionSpec.get("dt") == "3") + catalog.renamePartitions( + ident, + Array(Map("dt" -> "3").asJava), + Array(Map("dt" -> "4").asJava)) + val partition1 = catalog.getPartition(ident, Map("dt" -> "4").asJava) + assert(partition1.getPartitionSpec.get("dt") == "4") + assertThrows[NoSuchPartitionException]( + catalog.getPartition(ident, Map("dt" -> "3").asJava)) + catalog.dropPartitions(ident, Array(Map("dt" -> "4").asJava)) + assert(catalog.listPartitionNames(ident).isEmpty) + } + + test("alterPartition") { + assert(catalog.listPartitionNames(ident).isEmpty) + val part = new TablePartition(Map("dt" -> "3").asJava, new util.HashMap[String, String]()) + catalog.createPartitions(ident, Array(part)) + assert(catalog.listPartitionNames(ident).nonEmpty) + val partition = catalog.getPartition(ident, Map("dt" -> "3").asJava) + assert(partition.getPartitionSpec.get("dt") == "3") + assert(partition.getParametes.isEmpty) + val part1 = new TablePartition(Map("dt" -> "3").asJava, Map("dt" -> "3").asJava) + catalog.alterPartitions(ident, Array(part1)) + assert(catalog.listPartitionNames(ident).nonEmpty) + val partition1 = catalog.getPartition(ident, Map("dt" -> "3").asJava) + assert(partition1.getPartitionSpec.get("dt") == "3") + assert(!partition1.getParametes.isEmpty) + assert(partition1.getParametes.get("dt") == "3") + catalog.dropPartitions(ident, Array(Map("dt" -> "3").asJava)) + assert(catalog.listPartitionNames(ident).isEmpty) + } + + test("getPartition") { + assert(catalog.listPartitionNames(ident).isEmpty) + val part = new TablePartition(Map("dt" -> "3").asJava, new util.HashMap[String, String]()) + catalog.createPartitions(ident, Array(part)) + assert(catalog.listPartitionNames(ident).nonEmpty) + val partition = catalog.getPartition(ident, Map("dt" -> "3").asJava) + assert(partition.getPartitionSpec.get("dt") == "3") + assertThrows[NoSuchPartitionException]( + catalog.getPartition(ident, Map("dt" -> "4").asJava)) + catalog.dropPartitions(ident, Array(Map("dt" -> "3").asJava)) + assert(catalog.listPartitionNames(ident).isEmpty) + } + + test("listPartitionNames") { + assert(catalog.listPartitionNames(ident).isEmpty) + val part = new TablePartition(Map("dt" -> "3").asJava, new util.HashMap[String, String]()) + catalog.createPartitions(ident, Array(part)) + assert(catalog.listPartitionNames(ident).nonEmpty) + val partition = catalog.getPartition(ident, Map("dt" -> "3").asJava) + assert(partition.getPartitionSpec.get("dt") == "3") + val partitionNames = catalog.listPartitionNames(ident) + assert(partitionNames.nonEmpty) + assert(partitionNames.length == 1) + val part1 = new TablePartition(Map("dt" -> "4").asJava, new util.HashMap[String, String]()) + catalog.createPartitions(ident, Array(part1)) + val partitionNames1 = catalog.listPartitionNames(ident) + assert(partitionNames1.nonEmpty) + assert(partitionNames1.length == 2) + val partitionNames2 = catalog.listPartitionNames(ident, Map("dt" -> "3").asJava) + assert(partitionNames2.nonEmpty) + assert(partitionNames2.length == 1) + val partitionNames3 = catalog.listPartitionNames(ident, Map("dt" -> "5").asJava) + assert(partitionNames3.isEmpty) + catalog.dropPartitions(ident, Array(Map("dt" -> "3").asJava)) + catalog.dropPartitions(ident, Array(Map("dt" -> "4").asJava)) + assert(catalog.listPartitionNames(ident).isEmpty) + } + + test("listPartitions") { + assert(catalog.listPartitions(ident).isEmpty) + val part = new TablePartition(Map("dt" -> "3").asJava, new util.HashMap[String, String]()) + catalog.createPartitions(ident, Array(part)) + assert(catalog.listPartitions(ident).nonEmpty) + val partition = catalog.getPartition(ident, Map("dt" -> "3").asJava) + assert(partition.getPartitionSpec.get("dt") == "3") + val partitions = catalog.listPartitions(ident) + assert(partitions.nonEmpty) + assert(partitions.length == 1) + val part1 = new TablePartition(Map("dt" -> "4").asJava, new util.HashMap[String, String]()) + catalog.createPartitions(ident, Array(part1)) + val partitions1 = catalog.listPartitions(ident) + assert(partitions1.nonEmpty) + assert(partitions1.length == 2) + val partitions2 = catalog.listPartitions(ident, Map("dt" -> "3").asJava) + assert(partitions2.nonEmpty) + assert(partitions2.length == 1) + val partitions3 = catalog.listPartitions(ident, Map("dt" -> "5").asJava) + assert(partitions3.isEmpty) + catalog.dropPartitions(ident, Array(Map("dt" -> "3").asJava)) + catalog.dropPartitions(ident, Array(Map("dt" -> "4").asJava)) + assert(catalog.listPartitions(ident).isEmpty) + } +} From 4a77db03ed8fb78774853768c90ac3cfcfde6892 Mon Sep 17 00:00:00 2001 From: stczwd Date: Sat, 23 May 2020 12:24:37 +0800 Subject: [PATCH 03/23] fix suite tests Change-Id: I39d0a5457b11dc071962f8e60d9a580fb9db1ed6 --- .../connector/InMemoryPartitionCatalog.scala | 56 ++++++++----------- 1 file changed, 22 insertions(+), 34 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionCatalog.scala index 2324d49003eb8..1b1d8b64e57b0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionCatalog.scala @@ -43,37 +43,26 @@ class InMemoryPartitionCatalog extends InMemoryTableCatalog with SupportsPartiti ignoreIfExists: lang.Boolean = false): Unit = { assert(tableExists(ident)) val table = loadTable(ident).asInstanceOf[InMemoryTable] - val partitionKeys = table.partitioning.map(_.references().head.fieldNames().head) - checkPartitionKeysExists(partitionKeys, partitions.map(_.getPartitionSpec)) - val newPartitions = partitions.map { partition => - val partitionSpec = partition.getPartitionSpec - partitionKeys.filterNot(partitionSpec.keySet().contains) - .foreach { partitionKey => partitionSpec.put(partitionKey, "")} - partition.setPartitionSpec(partitionSpec) - partition - } + val tableSchema = table.schema.map(_.name) + checkPartitionKeysExists(tableSchema, partitions.map(_.getPartitionSpec)) val tablePartitions = memoryTablePartitions.getOrDefault(ident, new mutable.HashSet[TablePartition]()) - newPartitions.foreach(tablePartitions.add) + partitions.foreach(tablePartitions.add) memoryTablePartitions.put(ident, tablePartitions) } def dropPartitions( ident: Identifier, partitions: Array[util.Map[String, String]], - ignoreIfNotExists: lang.Boolean = false, - purge: lang.Boolean = false, - retainData: lang.Boolean = false): Unit = { + ignoreIfNotExists: lang.Boolean = false): Unit = { assert(tableExists(ident)) val table = loadTable(ident).asInstanceOf[InMemoryTable] - val partitionKeys = table.partitioning.map(_.references().head.fieldNames().head) - checkPartitionKeysExists(partitionKeys, partitions) + val tableSchema = table.schema.map(_.name) + checkPartitionKeysExists(tableSchema, partitions) if (memoryTablePartitions.containsKey(ident)) { val tablePartitions = memoryTablePartitions.get(ident) tablePartitions.filter { tablePartition => - partitions.exists { partition => - partition.asScala.toSet.subsetOf(tablePartition.getPartitionSpec.asScala.toSet) - } + partitions.contains(tablePartition.getPartitionSpec) }.foreach(tablePartitions.remove) memoryTablePartitions.put(ident, tablePartitions) } @@ -81,22 +70,21 @@ class InMemoryPartitionCatalog extends InMemoryTableCatalog with SupportsPartiti def renamePartitions( ident: Identifier, - oldpartitions: Array[util.Map[String, String]], + oldPartitions: Array[util.Map[String, String]], newPartitions: Array[util.Map[String, String]]): Unit = { assert(tableExists(ident)) val table = loadTable(ident).asInstanceOf[InMemoryTable] - val partitionKeys = table.partitioning.map(_.references().head.fieldNames().head) - checkPartitionKeysExists(partitionKeys, oldpartitions) - checkPartitionKeysExists(partitionKeys, newPartitions) + val tableSchema = table.schema.map(_.name) + checkPartitionKeysExists(tableSchema, oldPartitions) + checkPartitionKeysExists(tableSchema, newPartitions) if (memoryTablePartitions.containsKey(ident)) { val tablePartitions = memoryTablePartitions.get(ident) - for (index <- oldpartitions.indices) { + for (oldPartition <- oldPartitions; + newPartition <- newPartitions) { tablePartitions.filter { tablePartition => - oldpartitions(index).asScala.toSet.subsetOf(tablePartition.getPartitionSpec.asScala.toSet) + oldPartition == tablePartition.getPartitionSpec }.foreach { tablePartition => - val newPartitionSpec = new util.HashMap[String, String](tablePartition.getPartitionSpec) - newPartitionSpec.putAll(newPartitions(index)) - tablePartition.setPartitionSpec(newPartitionSpec) + tablePartition.setPartitionSpec(newPartition) } } memoryTablePartitions.put(ident, tablePartitions) @@ -109,8 +97,8 @@ class InMemoryPartitionCatalog extends InMemoryTableCatalog with SupportsPartiti assert(tableExists(ident)) assert(tableExists(ident)) val table = loadTable(ident).asInstanceOf[InMemoryTable] - val partitionKeys = table.partitioning.map(_.references().head.fieldNames().head) - checkPartitionKeysExists(partitionKeys, partitions.map(_.getPartitionSpec)) + val tableSchema = table.schema.map(_.name) + checkPartitionKeysExists(tableSchema, partitions.map(_.getPartitionSpec)) if (memoryTablePartitions.containsKey(ident)) { val tablePartitions = memoryTablePartitions.get(ident) partitions.foreach { partition => @@ -126,8 +114,8 @@ class InMemoryPartitionCatalog extends InMemoryTableCatalog with SupportsPartiti partition: util.Map[String, String]): TablePartition = { assert(tableExists(ident)) val table = loadTable(ident).asInstanceOf[InMemoryTable] - val partitionKeys = table.partitioning.map(_.references().head.fieldNames().head) - checkPartitionKeysExists(partitionKeys, Array(partition)) + val tableSchema = table.schema.map(_.name) + checkPartitionKeysExists(tableSchema, Array(partition)) memoryTablePartitions.getOrDefault(ident, mutable.HashSet.empty[TablePartition]) .find(_.getPartitionSpec == partition) .getOrElse { @@ -163,13 +151,13 @@ class InMemoryPartitionCatalog extends InMemoryTableCatalog with SupportsPartiti } private def checkPartitionKeysExists( - partitionKeys: Array[String], + tableSchema: Seq[String], partitions: Array[util.Map[String, String]]): Unit = { partitions.foreach { partition => - val errorPartitionKeys = partition.keySet().asScala.filterNot(partitionKeys.contains) + val errorPartitionKeys = partition.keySet().asScala.filterNot(tableSchema.contains) if (errorPartitionKeys.nonEmpty) { throw new IllegalArgumentException( - s"Partition Keys not exists, table partitions: ${partitionKeys.mkString("{", ",", "}")}") + s"Partition Keys not exists, table schema: ${tableSchema.mkString("{", ",", "}")}") } } } From 9ff1c6c665e7544588a79cb790818a4d35f43bac Mon Sep 17 00:00:00 2001 From: stczwd Date: Sun, 26 Jul 2020 19:51:07 +0800 Subject: [PATCH 04/23] rename partition API Change-Id: Ifa7655acf23f3ae6cfd70c41c91ee190ae78d4b8 --- .../connector/catalog/SupportsPartitions.java | 101 +++++----- .../sql/connector/catalog/TablePartition.java | 54 ------ .../analysis/AlreadyExistException.scala | 22 ++- .../analysis/NoSuchItemException.scala | 19 +- .../connector/InMemoryPartitionCatalog.scala | 164 ----------------- .../connector/InMemoryPartitionTable.scala | 108 +++++++++++ .../catalog/SupportsPartitionsSuite.scala | 174 ------------------ 7 files changed, 183 insertions(+), 459 deletions(-) delete mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TablePartition.java delete mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionCatalog.scala create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala delete mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionsSuite.scala diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java index b548d873d3063..317bea3eed884 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java @@ -19,87 +19,78 @@ import java.util.Map; import org.apache.spark.annotation.Experimental; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException; +import org.apache.spark.sql.catalyst.analysis.PartitionAlreadyExistsException; /** - * Catalog methods for working with Partitions. + * A partition interface of {@link Table} to indicate partition APIs. + * A partition is composed of identifier and properties, + * and properties contains metadata information of the partition. + *

+ * These APIs are used to modify table partition identifier or partition metadata, + * in some cases, they will change actual value of table data as well. + * + * @since 3.0.0 */ @Experimental -public interface SupportsPartitions extends TableCatalog { - - /** - * Create partitions in an existing table, assuming it exists. - * - * @param ident a table identifier - * @param partitions transforms to use for partitioning data in the table - * @param ignoreIfExists - */ - void createPartitions( - Identifier ident, - TablePartition[] partitions, - Boolean ignoreIfExists); +public interface SupportsPartitions extends Table { /** - * Drop partitions from a table, assuming they exist. + * Create a partition in table. * - * @param ident a table identifier - * @param partitions a list of string map for existing partitions - * @param ignoreIfNotExists + * @param ident a new partition identifier + * @param properties the metadata of a partition + * @throws PartitionAlreadyExistsException If a partition already exists for the identifier */ - void dropPartitions( - Identifier ident, - Map[] partitions, - Boolean ignoreIfNotExists); + void createPartition( + InternalRow ident, + Map properties) throws PartitionAlreadyExistsException; /** - * Override the specs of one or many existing table partitions, assuming they exist. + * Drop a partition from table. * - * @param ident a table identifier - * @param oldpartitions a list of string map for existing partitions to be renamed - * @param newPartitions a list of string map for new partitions + * @param ident a partition identifier + * @return true if a partition was deleted, false if no partition exists for the identifier */ - void renamePartitions( - Identifier ident, - Map[] oldpartitions, - Map[] newPartitions); + Boolean dropPartition(InternalRow ident); /** - * Alter one or many table partitions whose specs that match those specified in `parts`, - * assuming the partitions exist. + * Rename a Partition from old identifier to new identifier with no metadata changed. * - * @param ident a table identifier - * @param partitions transforms to use for partitioning data in the table + * @param oldIdent the partition identifier of the existing partition + * @param newIdent the new partition identifier of the partition + * @throws NoSuchPartitionException If the partition identifier to rename doesn't exist + * @throws PartitionAlreadyExistsException If the new partition identifier already exists */ - void alterPartitions( - Identifier ident, - TablePartition[] partitions); + void renamePartition( + InternalRow oldIdent, + InternalRow newIdent) throws NoSuchPartitionException, PartitionAlreadyExistsException; /** - * Retrieve the metadata of a table partition, assuming it exists. + * Replace the partition metadata of the existing partition. * - * @param ident a table identifier - * @param partition a list of string map for existing partitions + * @param ident the partition identifier of the existing partition + * @param properties the new metadata of the partition + * @throws NoSuchPartitionException If the partition identifier to rename doesn't exist */ - TablePartition getPartition( - Identifier ident, - Map partition); + void replacePartitionMetadata( + InternalRow ident, + Map properties) throws NoSuchPartitionException; /** - * List the names of all partitions that belong to the specified table, assuming it exists. + * Retrieve the partition metadata of the existing partition. * - * @param ident a table identifier - * @param partition a list of string map for existing partitions + * @param ident a partition identifier + * @return the metadata of the partition */ - String[] listPartitionNames( - Identifier ident, - Map partition); + Map getPartitionMetadata(InternalRow ident); /** - * List the metadata of all partitions that belong to the specified table, assuming it exists. + * List the identifiers of all partitions that contains the ident in a table. * - * @param ident a table identifier - * @param partition a list of string map for existing partitions + * @param ident a prefix of partition identifier + * @return an array of Identifiers for the partitions */ - TablePartition[] listPartitions( - Identifier ident, - Map partition); + InternalRow[] listPartitionIdentifiers(InternalRow ident); } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TablePartition.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TablePartition.java deleted file mode 100644 index 23c3295433a9e..0000000000000 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TablePartition.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.connector.catalog; - -import java.util.HashMap; -import java.util.Map; - -public class TablePartition { - private Map partitionSpec; - private Map parametes; - - public TablePartition( - Map partitionSpec) { - this.partitionSpec = partitionSpec; - this.parametes = new HashMap(); - } - - public TablePartition( - Map partitionSpec, - Map parametes) { - this.partitionSpec = partitionSpec; - this.parametes = parametes; - } - - public Map getPartitionSpec() { - return partitionSpec; - } - - public void setPartitionSpec(Map partitionSpec) { - this.partitionSpec = partitionSpec; - } - - public Map getParametes() { - return parametes; - } - - public void setParameters(Map parametes) { - this.parametes = parametes; - } -} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala index 7e5d56a7d1196..8728515d971e5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala @@ -18,9 +18,11 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.types.StructType /** * Thrown by a catalog when an item already exists. The analyzer will rethrow the exception @@ -48,14 +50,22 @@ class TableAlreadyExistsException(message: String) extends AnalysisException(mes class TempTableAlreadyExistsException(table: String) extends TableAlreadyExistsException(s"Temporary view '$table' already exists") -class PartitionAlreadyExistsException(db: String, table: String, spec: TablePartitionSpec) - extends AnalysisException( - s"Partition already exists in table '$table' database '$db':\n" + spec.mkString("\n")) +class PartitionAlreadyExistsException(message: String) extends AnalysisException(message) { + def this(db: String, table: String, spec: TablePartitionSpec) = { + this(s"Partition already exists in table '$table' database '$db':\n" + spec.mkString("\n")) + } -class PartitionsAlreadyExistException(db: String, table: String, specs: Seq[TablePartitionSpec]) - extends AnalysisException( - s"The following partitions already exists in table '$table' database '$db':\n" + def this(db: String, table: String, specs: Seq[TablePartitionSpec]) = { + this(s"The following partitions already exists in table '$table' database '$db':\n" + specs.mkString("\n===\n")) + } + + def this(tableName: String, partitionIdent: InternalRow, partitionSchema: StructType) = { + this(s"Partition " + + s"${partitionIdent.toSeq(partitionSchema).zip(partitionSchema.map(_.name)) + .map( kv => s"${kv._1} -> ${kv._2}").mkString(",")} in $tableName exists") + } +} class FunctionAlreadyExistsException(db: String, func: String) extends AnalysisException(s"Function '$func' already exists in database '$db'") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala index 9b5b059908c00..5d0fa9a9cfd8e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala @@ -18,9 +18,11 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.types.StructType /** @@ -46,12 +48,17 @@ class NoSuchTableException(message: String) extends AnalysisException(message) { } } -class NoSuchPartitionException( - db: String, - table: String, - spec: TablePartitionSpec) - extends AnalysisException( - s"Partition not found in table '$table' database '$db':\n" + spec.mkString("\n")) +class NoSuchPartitionException(message: String) extends AnalysisException(message) { + def this(db: String, table: String, spec: TablePartitionSpec) = { + this(s"Partition not found in table '$table' database '$db':\n" + spec.mkString("\n")) + } + + def this(tableName: String, partitionIdent: InternalRow, partitionSchema: StructType) = { + this(s"Partition not found in table $tableName: " + + s"${partitionIdent.toSeq(partitionSchema).zip(partitionSchema.map(_.name)) + .map( kv => s"${kv._1} -> ${kv._2}").mkString(",")}") + } +} class NoSuchPermanentFunctionException(db: String, func: String) extends AnalysisException(s"Function '$func' not found in database '$db'") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionCatalog.scala deleted file mode 100644 index 1b1d8b64e57b0..0000000000000 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionCatalog.scala +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.connector - -import java.{lang, util} -import java.util.concurrent.ConcurrentHashMap - -import scala.collection.JavaConverters._ -import scala.collection.mutable - -import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException -import org.apache.spark.sql.connector.catalog.{Identifier, SupportsPartitions, TablePartition} - - -/** - * This class is used to test SupportsPartitions API. - */ -class InMemoryPartitionCatalog extends InMemoryTableCatalog with SupportsPartitions { - - import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ - - protected val memoryTablePartitions: util.Map[Identifier, mutable.HashSet[TablePartition]] = - new ConcurrentHashMap[Identifier, mutable.HashSet[TablePartition]]() - - def createPartitions( - ident: Identifier, - partitions: Array[TablePartition], - ignoreIfExists: lang.Boolean = false): Unit = { - assert(tableExists(ident)) - val table = loadTable(ident).asInstanceOf[InMemoryTable] - val tableSchema = table.schema.map(_.name) - checkPartitionKeysExists(tableSchema, partitions.map(_.getPartitionSpec)) - val tablePartitions = - memoryTablePartitions.getOrDefault(ident, new mutable.HashSet[TablePartition]()) - partitions.foreach(tablePartitions.add) - memoryTablePartitions.put(ident, tablePartitions) - } - - def dropPartitions( - ident: Identifier, - partitions: Array[util.Map[String, String]], - ignoreIfNotExists: lang.Boolean = false): Unit = { - assert(tableExists(ident)) - val table = loadTable(ident).asInstanceOf[InMemoryTable] - val tableSchema = table.schema.map(_.name) - checkPartitionKeysExists(tableSchema, partitions) - if (memoryTablePartitions.containsKey(ident)) { - val tablePartitions = memoryTablePartitions.get(ident) - tablePartitions.filter { tablePartition => - partitions.contains(tablePartition.getPartitionSpec) - }.foreach(tablePartitions.remove) - memoryTablePartitions.put(ident, tablePartitions) - } - } - - def renamePartitions( - ident: Identifier, - oldPartitions: Array[util.Map[String, String]], - newPartitions: Array[util.Map[String, String]]): Unit = { - assert(tableExists(ident)) - val table = loadTable(ident).asInstanceOf[InMemoryTable] - val tableSchema = table.schema.map(_.name) - checkPartitionKeysExists(tableSchema, oldPartitions) - checkPartitionKeysExists(tableSchema, newPartitions) - if (memoryTablePartitions.containsKey(ident)) { - val tablePartitions = memoryTablePartitions.get(ident) - for (oldPartition <- oldPartitions; - newPartition <- newPartitions) { - tablePartitions.filter { tablePartition => - oldPartition == tablePartition.getPartitionSpec - }.foreach { tablePartition => - tablePartition.setPartitionSpec(newPartition) - } - } - memoryTablePartitions.put(ident, tablePartitions) - } - } - - def alterPartitions( - ident: Identifier, - partitions: Array[TablePartition]): Unit = { - assert(tableExists(ident)) - assert(tableExists(ident)) - val table = loadTable(ident).asInstanceOf[InMemoryTable] - val tableSchema = table.schema.map(_.name) - checkPartitionKeysExists(tableSchema, partitions.map(_.getPartitionSpec)) - if (memoryTablePartitions.containsKey(ident)) { - val tablePartitions = memoryTablePartitions.get(ident) - partitions.foreach { partition => - tablePartitions.filter(_.getPartitionSpec == partition.getPartitionSpec) - .foreach(tablePartitions.remove) - tablePartitions.add(partition) - } - } - } - - def getPartition( - ident: Identifier, - partition: util.Map[String, String]): TablePartition = { - assert(tableExists(ident)) - val table = loadTable(ident).asInstanceOf[InMemoryTable] - val tableSchema = table.schema.map(_.name) - checkPartitionKeysExists(tableSchema, Array(partition)) - memoryTablePartitions.getOrDefault(ident, mutable.HashSet.empty[TablePartition]) - .find(_.getPartitionSpec == partition) - .getOrElse { - throw new NoSuchPartitionException( - ident.namespace().quoted, - ident.name(), - partition.asScala.toMap) - } - } - - def listPartitionNames( - ident: Identifier, - partition: util.Map[String, String] = new util.HashMap()): Array[String] = { - assert(tableExists(ident)) - memoryTablePartitions.getOrDefault(ident, mutable.HashSet.empty[TablePartition]) - .filter { tablePartition => - partition.asScala.toSet.subsetOf(tablePartition.getPartitionSpec.asScala.toSet) - }.map { tablePartition => - tablePartition.getPartitionSpec.asScala.map { kv => - s"${kv._1}=${kv._2}" - }.mkString("/") - }.toArray - } - - def listPartitions( - ident: Identifier, - partition: util.Map[String, String] = new util.HashMap()): Array[TablePartition] = { - assert(tableExists(ident)) - memoryTablePartitions.getOrDefault(ident, mutable.HashSet.empty[TablePartition]) - .filter { tablePartition => - partition.asScala.toSet.subsetOf(tablePartition.getPartitionSpec.asScala.toSet) - }.toArray - } - - private def checkPartitionKeysExists( - tableSchema: Seq[String], - partitions: Array[util.Map[String, String]]): Unit = { - partitions.foreach { partition => - val errorPartitionKeys = partition.keySet().asScala.filterNot(tableSchema.contains) - if (errorPartitionKeys.nonEmpty) { - throw new IllegalArgumentException( - s"Partition Keys not exists, table schema: ${tableSchema.mkString("{", ",", "}")}") - } - } - } -} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala new file mode 100644 index 0000000000000..fc6e8269becb0 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +import java.{lang, util} +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionAlreadyExistsException} +import org.apache.spark.sql.connector.catalog.SupportsPartitions +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.types.StructType + + +/** + * This class is used to test SupportsPartitions API. + */ +class InMemoryPartitionTable( + name: String, + schema: StructType, + partitioning: Array[Transform], + properties: util.Map[String, String]) + extends InMemoryTable(name, schema, partitioning, properties) with SupportsPartitions { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + private val memoryTablePartitions: util.Map[InternalRow, util.Map[String, String]] = + new ConcurrentHashMap[InternalRow, util.Map[String, String]]() + + private val partCols: StructType = { + val partitionColumnNames = partitioning.toSeq.asPartitionColumns + new StructType(schema.filter(partitionColumnNames.contains).toArray) + } + + def createPartition( + ident: InternalRow, + properties: util.Map[String, String]): Unit = { + if (memoryTablePartitions.containsKey(ident)) { + throw new PartitionAlreadyExistsException(name, ident, partCols) + } else { + val tablePartitions = + memoryTablePartitions.getOrDefault(ident, Map.empty[String, String].asJava) + memoryTablePartitions.put(ident, tablePartitions) + } + } + + def dropPartition(ident: InternalRow): lang.Boolean = { + if (memoryTablePartitions.containsKey(ident)) { + memoryTablePartitions.remove(ident) + true + } else { + false + } + } + + def renamePartition(oldIdent: InternalRow, newIdent: InternalRow): Unit = { + if (!memoryTablePartitions.containsKey(oldIdent)) { + throw new NoSuchPartitionException(name, oldIdent, partCols) + } else if (memoryTablePartitions.containsKey(newIdent)) { + throw new PartitionAlreadyExistsException(name, newIdent, partCols) + } else { + val partitionMetadata = memoryTablePartitions.get(oldIdent) + memoryTablePartitions.remove(oldIdent) + memoryTablePartitions.put(newIdent, partitionMetadata) + } + } + + def replacePartitionMetadata(ident: InternalRow, properties: util.Map[String, String]): Unit = { + if (memoryTablePartitions.containsKey(ident)) { + throw new PartitionAlreadyExistsException(name, ident, partCols) + } else { + memoryTablePartitions.put(ident, properties) + } + } + + def getPartitionMetadata(ident: InternalRow): util.Map[String, String] = { + if (memoryTablePartitions.containsKey(ident)) { + throw new PartitionAlreadyExistsException(name, ident, partCols) + } else { + memoryTablePartitions.get(ident) + } + } + + def listPartitionIdentifiers(ident: InternalRow): Array[InternalRow] = { + val prefixPartCols = + new StructType(partCols.dropRight(partCols.length - ident.numFields).toArray) + val prefixPart = ident.toSeq(prefixPartCols) + memoryTablePartitions.keySet().asScala + .filter(_.toSeq(partCols).startsWith(prefixPart)).toArray + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionsSuite.scala deleted file mode 100644 index 53a1b25c54336..0000000000000 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionsSuite.scala +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.connector.catalog - -import java.util - -import scala.collection.JavaConverters._ - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException -import org.apache.spark.sql.connector.InMemoryPartitionCatalog -import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference} -import org.apache.spark.sql.types.{IntegerType, StringType, StructType} -import org.apache.spark.sql.util.CaseInsensitiveStringMap - -class SupportsPartitionsSuite extends SparkFunSuite { - - private val ident: Identifier = Identifier.of(Array("ns"), "test_table") - def ref(name: String): NamedReference = LogicalExpressions.parseReference(name) - - private val catalog: InMemoryPartitionCatalog = { - val newCatalog = new InMemoryPartitionCatalog - newCatalog.initialize("test", CaseInsensitiveStringMap.empty()) - newCatalog.createTable( - ident, - new StructType() - .add("id", IntegerType) - .add("data", StringType) - .add("dt", StringType), - Array(LogicalExpressions.identity(ref("dt"))), - util.Collections.emptyMap[String, String]) - newCatalog - } - - test("createPartitions") { - assert(catalog.listPartitionNames(ident).isEmpty) - val part = new TablePartition(Map("dt" -> "3").asJava, new util.HashMap[String, String]()) - catalog.createPartitions(ident, Array(part)) - assert(catalog.listPartitionNames(ident).nonEmpty) - val partition = catalog.getPartition(ident, Map("dt" -> "3").asJava) - assert(partition.getPartitionSpec.get("dt") == "3") - catalog.dropPartitions(ident, Array(Map("dt" -> "3").asJava)) - assert(catalog.listPartitionNames(ident).isEmpty) - } - - test("dropPartitions") { - assert(catalog.listPartitionNames(ident).isEmpty) - val part = new TablePartition(Map("dt" -> "3").asJava, new util.HashMap[String, String]()) - val part1 = new TablePartition(Map("dt" -> "4").asJava, new util.HashMap[String, String]()) - catalog.createPartitions(ident, Array(part)) - catalog.createPartitions(ident, Array(part1)) - assert(catalog.listPartitionNames(ident).length == 2) - catalog.dropPartitions(ident, Array(Map("dt" -> "3").asJava)) - assert(catalog.listPartitionNames(ident).length == 1) - catalog.dropPartitions(ident, Array(Map("dt" -> "4").asJava)) - assert(catalog.listPartitionNames(ident).isEmpty) - } - - test("renamePartition") { - assert(catalog.listPartitionNames(ident).isEmpty) - val part = new TablePartition(Map("dt" -> "3").asJava, new util.HashMap[String, String]()) - catalog.createPartitions(ident, Array(part)) - assert(catalog.listPartitionNames(ident).nonEmpty) - val partition = catalog.getPartition(ident, Map("dt" -> "3").asJava) - assert(partition.getPartitionSpec.get("dt") == "3") - catalog.renamePartitions( - ident, - Array(Map("dt" -> "3").asJava), - Array(Map("dt" -> "4").asJava)) - val partition1 = catalog.getPartition(ident, Map("dt" -> "4").asJava) - assert(partition1.getPartitionSpec.get("dt") == "4") - assertThrows[NoSuchPartitionException]( - catalog.getPartition(ident, Map("dt" -> "3").asJava)) - catalog.dropPartitions(ident, Array(Map("dt" -> "4").asJava)) - assert(catalog.listPartitionNames(ident).isEmpty) - } - - test("alterPartition") { - assert(catalog.listPartitionNames(ident).isEmpty) - val part = new TablePartition(Map("dt" -> "3").asJava, new util.HashMap[String, String]()) - catalog.createPartitions(ident, Array(part)) - assert(catalog.listPartitionNames(ident).nonEmpty) - val partition = catalog.getPartition(ident, Map("dt" -> "3").asJava) - assert(partition.getPartitionSpec.get("dt") == "3") - assert(partition.getParametes.isEmpty) - val part1 = new TablePartition(Map("dt" -> "3").asJava, Map("dt" -> "3").asJava) - catalog.alterPartitions(ident, Array(part1)) - assert(catalog.listPartitionNames(ident).nonEmpty) - val partition1 = catalog.getPartition(ident, Map("dt" -> "3").asJava) - assert(partition1.getPartitionSpec.get("dt") == "3") - assert(!partition1.getParametes.isEmpty) - assert(partition1.getParametes.get("dt") == "3") - catalog.dropPartitions(ident, Array(Map("dt" -> "3").asJava)) - assert(catalog.listPartitionNames(ident).isEmpty) - } - - test("getPartition") { - assert(catalog.listPartitionNames(ident).isEmpty) - val part = new TablePartition(Map("dt" -> "3").asJava, new util.HashMap[String, String]()) - catalog.createPartitions(ident, Array(part)) - assert(catalog.listPartitionNames(ident).nonEmpty) - val partition = catalog.getPartition(ident, Map("dt" -> "3").asJava) - assert(partition.getPartitionSpec.get("dt") == "3") - assertThrows[NoSuchPartitionException]( - catalog.getPartition(ident, Map("dt" -> "4").asJava)) - catalog.dropPartitions(ident, Array(Map("dt" -> "3").asJava)) - assert(catalog.listPartitionNames(ident).isEmpty) - } - - test("listPartitionNames") { - assert(catalog.listPartitionNames(ident).isEmpty) - val part = new TablePartition(Map("dt" -> "3").asJava, new util.HashMap[String, String]()) - catalog.createPartitions(ident, Array(part)) - assert(catalog.listPartitionNames(ident).nonEmpty) - val partition = catalog.getPartition(ident, Map("dt" -> "3").asJava) - assert(partition.getPartitionSpec.get("dt") == "3") - val partitionNames = catalog.listPartitionNames(ident) - assert(partitionNames.nonEmpty) - assert(partitionNames.length == 1) - val part1 = new TablePartition(Map("dt" -> "4").asJava, new util.HashMap[String, String]()) - catalog.createPartitions(ident, Array(part1)) - val partitionNames1 = catalog.listPartitionNames(ident) - assert(partitionNames1.nonEmpty) - assert(partitionNames1.length == 2) - val partitionNames2 = catalog.listPartitionNames(ident, Map("dt" -> "3").asJava) - assert(partitionNames2.nonEmpty) - assert(partitionNames2.length == 1) - val partitionNames3 = catalog.listPartitionNames(ident, Map("dt" -> "5").asJava) - assert(partitionNames3.isEmpty) - catalog.dropPartitions(ident, Array(Map("dt" -> "3").asJava)) - catalog.dropPartitions(ident, Array(Map("dt" -> "4").asJava)) - assert(catalog.listPartitionNames(ident).isEmpty) - } - - test("listPartitions") { - assert(catalog.listPartitions(ident).isEmpty) - val part = new TablePartition(Map("dt" -> "3").asJava, new util.HashMap[String, String]()) - catalog.createPartitions(ident, Array(part)) - assert(catalog.listPartitions(ident).nonEmpty) - val partition = catalog.getPartition(ident, Map("dt" -> "3").asJava) - assert(partition.getPartitionSpec.get("dt") == "3") - val partitions = catalog.listPartitions(ident) - assert(partitions.nonEmpty) - assert(partitions.length == 1) - val part1 = new TablePartition(Map("dt" -> "4").asJava, new util.HashMap[String, String]()) - catalog.createPartitions(ident, Array(part1)) - val partitions1 = catalog.listPartitions(ident) - assert(partitions1.nonEmpty) - assert(partitions1.length == 2) - val partitions2 = catalog.listPartitions(ident, Map("dt" -> "3").asJava) - assert(partitions2.nonEmpty) - assert(partitions2.length == 1) - val partitions3 = catalog.listPartitions(ident, Map("dt" -> "5").asJava) - assert(partitions3.isEmpty) - catalog.dropPartitions(ident, Array(Map("dt" -> "3").asJava)) - catalog.dropPartitions(ident, Array(Map("dt" -> "4").asJava)) - assert(catalog.listPartitions(ident).isEmpty) - } -} From f9288aae53cf9591b7f7807dabf472b1b008ce8d Mon Sep 17 00:00:00 2001 From: stczwd Date: Sun, 26 Jul 2020 20:19:02 +0800 Subject: [PATCH 05/23] fix code error Change-Id: I1438992637bfb20a68b71c078610171fd576ade8 --- .../catalyst/analysis/AlreadyExistException.scala | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala index 8728515d971e5..3b0d356f2a01a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala @@ -55,17 +55,16 @@ class PartitionAlreadyExistsException(message: String) extends AnalysisException this(s"Partition already exists in table '$table' database '$db':\n" + spec.mkString("\n")) } - def this(db: String, table: String, specs: Seq[TablePartitionSpec]) = { - this(s"The following partitions already exists in table '$table' database '$db':\n" - + specs.mkString("\n===\n")) - } - def this(tableName: String, partitionIdent: InternalRow, partitionSchema: StructType) = { - this(s"Partition " + - s"${partitionIdent.toSeq(partitionSchema).zip(partitionSchema.map(_.name)) - .map( kv => s"${kv._1} -> ${kv._2}").mkString(",")} in $tableName exists") + this(s"Partition already exists in table $tableName:" + + partitionIdent.toSeq(partitionSchema).zip(partitionSchema.map(_.name)) + .map( kv => s"${kv._1} -> ${kv._2}").mkString(",")) } } +class PartitionsAlreadyExistException(db: String, table: String, specs: Seq[TablePartitionSpec]) + extends AnalysisException( + s"The following partitions already exists in table '$table' database '$db':\n" + class FunctionAlreadyExistsException(db: String, func: String) extends AnalysisException(s"Function '$func' already exists in database '$db'") From 866f46af3ead0da1a59a21d69b3d4631b5dc9fc8 Mon Sep 17 00:00:00 2001 From: stczwd Date: Sun, 26 Jul 2020 21:08:30 +0800 Subject: [PATCH 06/23] fix error Change-Id: I510fabc80caec1a4514273970fc72f6f8fa17d76 --- .../spark/sql/catalyst/analysis/AlreadyExistException.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala index 3b0d356f2a01a..cb8972416de32 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala @@ -64,7 +64,7 @@ class PartitionAlreadyExistsException(message: String) extends AnalysisException class PartitionsAlreadyExistException(db: String, table: String, specs: Seq[TablePartitionSpec]) extends AnalysisException( - s"The following partitions already exists in table '$table' database '$db':\n" + s"The following partitions already exists in table '$table' database '$db':\n") class FunctionAlreadyExistsException(db: String, func: String) extends AnalysisException(s"Function '$func' already exists in database '$db'") From 0fda12344d2f51bcd411dbf06beed5307f1aaa13 Mon Sep 17 00:00:00 2001 From: stczwd Date: Sun, 26 Jul 2020 22:57:19 +0800 Subject: [PATCH 07/23] change comment Change-Id: I9ea38153ec9153e62cf93fd6f90b57579205adf6 --- .../apache/spark/sql/connector/catalog/SupportsPartitions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java index 317bea3eed884..483c03d1f39da 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.PartitionAlreadyExistsException; /** - * A partition interface of {@link Table} to indicate partition APIs. + * A partition interface of {@link Table}. * A partition is composed of identifier and properties, * and properties contains metadata information of the partition. *

From e616e751302b18d167b1d717418c340dadb7a0fd Mon Sep 17 00:00:00 2001 From: stczwd Date: Thu, 30 Jul 2020 21:38:30 +0800 Subject: [PATCH 08/23] fix r test error Change-Id: Ic895676858a9ff3166e60f09e57012a8a29b43b3 --- .../spark/sql/catalyst/analysis/AlreadyExistException.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala index cb8972416de32..e03a0cf056f5c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala @@ -64,7 +64,8 @@ class PartitionAlreadyExistsException(message: String) extends AnalysisException class PartitionsAlreadyExistException(db: String, table: String, specs: Seq[TablePartitionSpec]) extends AnalysisException( - s"The following partitions already exists in table '$table' database '$db':\n") + s"The following partitions already exists in table '$table' database '$db':\n" + + specs.mkString("\n===\n")) class FunctionAlreadyExistsException(db: String, func: String) extends AnalysisException(s"Function '$func' already exists in database '$db'") From 2d838a476454b61f6ba8d1128dbeb3e20a5fb948 Mon Sep 17 00:00:00 2001 From: stczwd Date: Thu, 30 Jul 2020 23:11:15 +0800 Subject: [PATCH 09/23] add suites Change-Id: If2001d0d5b8fe31a1d6f0f993f6c0bc0f2783247 --- .../connector/InMemoryPartitionTable.scala | 14 +- .../catalog/SupportsPartitionsSuite.scala | 165 ++++++++++++++++++ 2 files changed, 172 insertions(+), 7 deletions(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionsSuite.scala diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala index fc6e8269becb0..5a5f9bde9ea5d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala @@ -55,9 +55,7 @@ class InMemoryPartitionTable( if (memoryTablePartitions.containsKey(ident)) { throw new PartitionAlreadyExistsException(name, ident, partCols) } else { - val tablePartitions = - memoryTablePartitions.getOrDefault(ident, Map.empty[String, String].asJava) - memoryTablePartitions.put(ident, tablePartitions) + memoryTablePartitions.put(ident, properties) } } @@ -84,17 +82,17 @@ class InMemoryPartitionTable( def replacePartitionMetadata(ident: InternalRow, properties: util.Map[String, String]): Unit = { if (memoryTablePartitions.containsKey(ident)) { - throw new PartitionAlreadyExistsException(name, ident, partCols) - } else { memoryTablePartitions.put(ident, properties) + } else { + throw new NoSuchPartitionException(name, ident, partCols) } } def getPartitionMetadata(ident: InternalRow): util.Map[String, String] = { if (memoryTablePartitions.containsKey(ident)) { - throw new PartitionAlreadyExistsException(name, ident, partCols) - } else { memoryTablePartitions.get(ident) + } else { + throw new NoSuchPartitionException(name, ident, partCols) } } @@ -105,4 +103,6 @@ class InMemoryPartitionTable( memoryTablePartitions.keySet().asScala .filter(_.toSeq(partCols).startsWith(prefixPart)).toArray } + + def partitionExists(ident: InternalRow): Boolean = memoryTablePartitions.containsKey(ident) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionsSuite.scala new file mode 100644 index 0000000000000..4ffed1f92fea7 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionsSuite.scala @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException +import org.apache.spark.sql.connector.{InMemoryPartitionTable, InMemoryTableCatalog} +import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference} +import org.apache.spark.sql.types.{IntegerType, StringType, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class SupportsPartitionsSuite extends SparkFunSuite { + + private val ident: Identifier = Identifier.of(Array("ns"), "test_table") + + def ref(name: String): NamedReference = LogicalExpressions.parseReference(name) + + private val catalog: InMemoryTableCatalog = { + val newCatalog = new InMemoryTableCatalog + newCatalog.initialize("test", CaseInsensitiveStringMap.empty()) + newCatalog.createTable( + ident, + new StructType() + .add("id", IntegerType) + .add("data", StringType) + .add("dt", StringType), + Array(LogicalExpressions.identity(ref("dt"))), + util.Collections.emptyMap[String, String]) + newCatalog + } + + test("createPartition") { + val table = catalog.loadTable(ident) + val partTable = new InMemoryPartitionTable( + table.name(), table.schema(), table.partitioning(), table.properties()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + + val partIdent = InternalRow.apply("3") + partTable.createPartition(partIdent, new util.HashMap[String, String]()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty) + assert(partTable.partitionExists(partIdent)) + + partTable.dropPartition(partIdent) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + } + + test("dropPartition") { + val table = catalog.loadTable(ident) + val partTable = new InMemoryPartitionTable( + table.name(), table.schema(), table.partitioning(), table.properties()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + + val partIdent = InternalRow.apply("3") + val partIdent1 = InternalRow.apply("4") + partTable.createPartition(partIdent, new util.HashMap[String, String]()) + partTable.createPartition(partIdent1, new util.HashMap[String, String]()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).length == 2) + + partTable.dropPartition(partIdent) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).length == 1) + partTable.dropPartition(partIdent1) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + } + + test("renamePartition") { + val table = catalog.loadTable(ident) + val partTable = new InMemoryPartitionTable( + table.name(), table.schema(), table.partitioning(), table.properties()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + + val partIdent = InternalRow.apply("3") + partTable.createPartition(partIdent, new util.HashMap[String, String]()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty) + assert(partTable.partitionExists(partIdent)) + + val newPartIdent = InternalRow.apply("4") + partTable.renamePartition(partIdent, newPartIdent) + assert(partTable.partitionExists(newPartIdent)) + assertThrows[NoSuchPartitionException]( + partTable.getPartitionMetadata(partIdent)) + + partTable.dropPartition(newPartIdent) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + } + + test("replacePartitionMetadata") { + val table = catalog.loadTable(ident) + val partTable = new InMemoryPartitionTable( + table.name(), table.schema(), table.partitioning(), table.properties()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + + val partIdent = InternalRow.apply("3") + partTable.createPartition(partIdent, new util.HashMap[String, String]()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty) + assert(partTable.partitionExists(partIdent)) + assert(partTable.getPartitionMetadata(partIdent).isEmpty) + + partTable.replacePartitionMetadata(partIdent, Map("paramKey" -> "paramValue").asJava) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty) + assert(partTable.partitionExists(partIdent)) + assert(!partTable.getPartitionMetadata(partIdent).isEmpty) + assert(partTable.getPartitionMetadata(partIdent).get("paramKey") == "paramValue") + + partTable.dropPartition(partIdent) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + } + + test("getPartitionMetadata") { + val table = catalog.loadTable(ident) + val partTable = new InMemoryPartitionTable( + table.name(), table.schema(), table.partitioning(), table.properties()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + + val partIdent = InternalRow.apply("3") + partTable.createPartition(partIdent, Map("paramKey" -> "paramValue").asJava) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty) + assert(partTable.partitionExists(partIdent)) + assert(!partTable.getPartitionMetadata(partIdent).isEmpty) + assert(partTable.getPartitionMetadata(partIdent).get("paramKey") == "paramValue") + + partTable.dropPartition(partIdent) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + } + + test("listPartitionNames") { + val table = catalog.loadTable(ident) + val partTable = new InMemoryPartitionTable( + table.name(), table.schema(), table.partitioning(), table.properties()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + + val partIdent = InternalRow.apply("3") + partTable.createPartition(partIdent, new util.HashMap[String, String]()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).length == 1) + + val partIdent1 = InternalRow.apply("4") + partTable.createPartition(partIdent1, new util.HashMap[String, String]()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).length == 2) + assert(partTable.listPartitionIdentifiers(partIdent1).length == 1) + + partTable.dropPartition(partIdent) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).length == 1) + partTable.dropPartition(partIdent1) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + } +} From 9a3664fecb3ac35ce6293fc9f3aa5f8e4af471ad Mon Sep 17 00:00:00 2001 From: stczwd Date: Fri, 31 Jul 2020 09:44:03 +0800 Subject: [PATCH 10/23] change partition API and docs Change-Id: Iea2ccc39eaf776b51851af0802c711811bce8eca --- .../connector/catalog/SupportsPartitions.java | 46 +++++++++++-------- .../connector/InMemoryPartitionTable.scala | 31 ++++--------- .../catalog/SupportsPartitionsSuite.scala | 34 +++----------- 3 files changed, 41 insertions(+), 70 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java index 483c03d1f39da..0d33bd895bfc1 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java @@ -22,30 +22,44 @@ import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException; import org.apache.spark.sql.catalyst.analysis.PartitionAlreadyExistsException; +import org.apache.spark.sql.types.StructType; /** * A partition interface of {@link Table}. * A partition is composed of identifier and properties, * and properties contains metadata information of the partition. *

- * These APIs are used to modify table partition identifier or partition metadata, - * in some cases, they will change actual value of table data as well. + * These APIs are used to modify table partition identifier or partition metadata. + * In some cases, they will change the table data as well. + * ${@link #createPartition}: + * add a partition and any data that its location contains to the table + * ${@link #dropPartition}: + * remove a partition and any data it contains from the table + * ${@link #replacePartitionMetadata}: + * point a partition to a new location, which will swap one location's data for the other * * @since 3.0.0 */ @Experimental public interface SupportsPartitions extends Table { + /** + * @return the partition schema of table + */ + StructType partitionSchema(); + /** * Create a partition in table. * * @param ident a new partition identifier * @param properties the metadata of a partition * @throws PartitionAlreadyExistsException If a partition already exists for the identifier + * @throws UnsupportedOperationException If partition property is not supported */ void createPartition( - InternalRow ident, - Map properties) throws PartitionAlreadyExistsException; + InternalRow ident, + Map properties) + throws PartitionAlreadyExistsException, UnsupportedOperationException; /** * Drop a partition from table. @@ -53,19 +67,7 @@ void createPartition( * @param ident a partition identifier * @return true if a partition was deleted, false if no partition exists for the identifier */ - Boolean dropPartition(InternalRow ident); - - /** - * Rename a Partition from old identifier to new identifier with no metadata changed. - * - * @param oldIdent the partition identifier of the existing partition - * @param newIdent the new partition identifier of the partition - * @throws NoSuchPartitionException If the partition identifier to rename doesn't exist - * @throws PartitionAlreadyExistsException If the new partition identifier already exists - */ - void renamePartition( - InternalRow oldIdent, - InternalRow newIdent) throws NoSuchPartitionException, PartitionAlreadyExistsException; + boolean dropPartition(InternalRow ident); /** * Replace the partition metadata of the existing partition. @@ -73,18 +75,22 @@ void renamePartition( * @param ident the partition identifier of the existing partition * @param properties the new metadata of the partition * @throws NoSuchPartitionException If the partition identifier to rename doesn't exist + * @throws UnsupportedOperationException If partition property is not supported */ void replacePartitionMetadata( - InternalRow ident, - Map properties) throws NoSuchPartitionException; + InternalRow ident, + Map properties) + throws NoSuchPartitionException, UnsupportedOperationException; /** * Retrieve the partition metadata of the existing partition. * * @param ident a partition identifier * @return the metadata of the partition + * @throws UnsupportedOperationException If partition property is not supported */ - Map getPartitionMetadata(InternalRow ident); + Map loadPartitionMetadata(InternalRow ident) + throws UnsupportedOperationException; /** * List the identifiers of all partitions that contains the ident in a table. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala index 5a5f9bde9ea5d..8e31088316912 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.connector -import java.{lang, util} +import java.util import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ @@ -38,13 +38,12 @@ class InMemoryPartitionTable( partitioning: Array[Transform], properties: util.Map[String, String]) extends InMemoryTable(name, schema, partitioning, properties) with SupportsPartitions { - import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ private val memoryTablePartitions: util.Map[InternalRow, util.Map[String, String]] = new ConcurrentHashMap[InternalRow, util.Map[String, String]]() - private val partCols: StructType = { + def partitionSchema: StructType = { val partitionColumnNames = partitioning.toSeq.asPartitionColumns new StructType(schema.filter(partitionColumnNames.contains).toArray) } @@ -53,13 +52,13 @@ class InMemoryPartitionTable( ident: InternalRow, properties: util.Map[String, String]): Unit = { if (memoryTablePartitions.containsKey(ident)) { - throw new PartitionAlreadyExistsException(name, ident, partCols) + throw new PartitionAlreadyExistsException(name, ident, partitionSchema) } else { memoryTablePartitions.put(ident, properties) } } - def dropPartition(ident: InternalRow): lang.Boolean = { + def dropPartition(ident: InternalRow): Boolean = { if (memoryTablePartitions.containsKey(ident)) { memoryTablePartitions.remove(ident) true @@ -68,40 +67,28 @@ class InMemoryPartitionTable( } } - def renamePartition(oldIdent: InternalRow, newIdent: InternalRow): Unit = { - if (!memoryTablePartitions.containsKey(oldIdent)) { - throw new NoSuchPartitionException(name, oldIdent, partCols) - } else if (memoryTablePartitions.containsKey(newIdent)) { - throw new PartitionAlreadyExistsException(name, newIdent, partCols) - } else { - val partitionMetadata = memoryTablePartitions.get(oldIdent) - memoryTablePartitions.remove(oldIdent) - memoryTablePartitions.put(newIdent, partitionMetadata) - } - } - def replacePartitionMetadata(ident: InternalRow, properties: util.Map[String, String]): Unit = { if (memoryTablePartitions.containsKey(ident)) { memoryTablePartitions.put(ident, properties) } else { - throw new NoSuchPartitionException(name, ident, partCols) + throw new NoSuchPartitionException(name, ident, partitionSchema) } } - def getPartitionMetadata(ident: InternalRow): util.Map[String, String] = { + def loadPartitionMetadata(ident: InternalRow): util.Map[String, String] = { if (memoryTablePartitions.containsKey(ident)) { memoryTablePartitions.get(ident) } else { - throw new NoSuchPartitionException(name, ident, partCols) + throw new NoSuchPartitionException(name, ident, partitionSchema) } } def listPartitionIdentifiers(ident: InternalRow): Array[InternalRow] = { val prefixPartCols = - new StructType(partCols.dropRight(partCols.length - ident.numFields).toArray) + new StructType(partitionSchema.dropRight(partitionSchema.length - ident.numFields).toArray) val prefixPart = ident.toSeq(prefixPartCols) memoryTablePartitions.keySet().asScala - .filter(_.toSeq(partCols).startsWith(prefixPart)).toArray + .filter(_.toSeq(partitionSchema).startsWith(prefixPart)).toArray } def partitionExists(ident: InternalRow): Boolean = memoryTablePartitions.containsKey(ident) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionsSuite.scala index 4ffed1f92fea7..a9cac1c7358ba 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionsSuite.scala @@ -23,7 +23,6 @@ import scala.collection.JavaConverters._ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException import org.apache.spark.sql.connector.{InMemoryPartitionTable, InMemoryTableCatalog} import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference} import org.apache.spark.sql.types.{IntegerType, StringType, StructType} @@ -82,27 +81,6 @@ class SupportsPartitionsSuite extends SparkFunSuite { assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) } - test("renamePartition") { - val table = catalog.loadTable(ident) - val partTable = new InMemoryPartitionTable( - table.name(), table.schema(), table.partitioning(), table.properties()) - assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) - - val partIdent = InternalRow.apply("3") - partTable.createPartition(partIdent, new util.HashMap[String, String]()) - assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty) - assert(partTable.partitionExists(partIdent)) - - val newPartIdent = InternalRow.apply("4") - partTable.renamePartition(partIdent, newPartIdent) - assert(partTable.partitionExists(newPartIdent)) - assertThrows[NoSuchPartitionException]( - partTable.getPartitionMetadata(partIdent)) - - partTable.dropPartition(newPartIdent) - assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) - } - test("replacePartitionMetadata") { val table = catalog.loadTable(ident) val partTable = new InMemoryPartitionTable( @@ -113,19 +91,19 @@ class SupportsPartitionsSuite extends SparkFunSuite { partTable.createPartition(partIdent, new util.HashMap[String, String]()) assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty) assert(partTable.partitionExists(partIdent)) - assert(partTable.getPartitionMetadata(partIdent).isEmpty) + assert(partTable.loadPartitionMetadata(partIdent).isEmpty) partTable.replacePartitionMetadata(partIdent, Map("paramKey" -> "paramValue").asJava) assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty) assert(partTable.partitionExists(partIdent)) - assert(!partTable.getPartitionMetadata(partIdent).isEmpty) - assert(partTable.getPartitionMetadata(partIdent).get("paramKey") == "paramValue") + assert(!partTable.loadPartitionMetadata(partIdent).isEmpty) + assert(partTable.loadPartitionMetadata(partIdent).get("paramKey") == "paramValue") partTable.dropPartition(partIdent) assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) } - test("getPartitionMetadata") { + test("loadPartitionMetadata") { val table = catalog.loadTable(ident) val partTable = new InMemoryPartitionTable( table.name(), table.schema(), table.partitioning(), table.properties()) @@ -135,8 +113,8 @@ class SupportsPartitionsSuite extends SparkFunSuite { partTable.createPartition(partIdent, Map("paramKey" -> "paramValue").asJava) assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty) assert(partTable.partitionExists(partIdent)) - assert(!partTable.getPartitionMetadata(partIdent).isEmpty) - assert(partTable.getPartitionMetadata(partIdent).get("paramKey") == "paramValue") + assert(!partTable.loadPartitionMetadata(partIdent).isEmpty) + assert(partTable.loadPartitionMetadata(partIdent).get("paramKey") == "paramValue") partTable.dropPartition(partIdent) assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) From f6848ed9cf9f99dd8eaed3eacdcc7cbd9a30b504 Mon Sep 17 00:00:00 2001 From: stczwd Date: Fri, 31 Jul 2020 11:40:18 +0800 Subject: [PATCH 11/23] change suite test name Change-Id: I1778afc8435ffb20e18baeaa130527157a42520b --- .../spark/sql/connector/catalog/SupportsPartitionsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionsSuite.scala index a9cac1c7358ba..91cf8718e1249 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionsSuite.scala @@ -120,7 +120,7 @@ class SupportsPartitionsSuite extends SparkFunSuite { assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) } - test("listPartitionNames") { + test("listPartitionIdentifiers") { val table = catalog.loadTable(ident) val partTable = new InMemoryPartitionTable( table.name(), table.schema(), table.partitioning(), table.properties()) From 282b8136bbd3696dd43cebb528aa5bbf067fb7e7 Mon Sep 17 00:00:00 2001 From: stczwd Date: Sat, 1 Aug 2020 06:05:43 +0800 Subject: [PATCH 12/23] change support version Change-Id: I0b4ca52f1f7b857ff464d1ae8ac50a9086823957 --- .../apache/spark/sql/connector/catalog/SupportsPartitions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java index 0d33bd895bfc1..fd7ceea01b8cf 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java @@ -38,7 +38,7 @@ * ${@link #replacePartitionMetadata}: * point a partition to a new location, which will swap one location's data for the other * - * @since 3.0.0 + * @since 3.1.0 */ @Experimental public interface SupportsPartitions extends Table { From 825d2e9c2dd269f39f71a34fa7719e707c55af2e Mon Sep 17 00:00:00 2001 From: stczwd Date: Sun, 2 Aug 2020 09:49:31 +0800 Subject: [PATCH 13/23] change comment from rename to alter Change-Id: Ibad5036a43ab240f99cb0c00f4d2eeb351204bed --- .../apache/spark/sql/connector/catalog/SupportsPartitions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java index fd7ceea01b8cf..e61bcc1411c55 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java @@ -74,7 +74,7 @@ void createPartition( * * @param ident the partition identifier of the existing partition * @param properties the new metadata of the partition - * @throws NoSuchPartitionException If the partition identifier to rename doesn't exist + * @throws NoSuchPartitionException If the partition identifier to alter doesn't exist * @throws UnsupportedOperationException If partition property is not supported */ void replacePartitionMetadata( From 974f29a0cb39c187aac90158110b76a42231535c Mon Sep 17 00:00:00 2001 From: stczwd Date: Sun, 2 Aug 2020 23:46:29 +0800 Subject: [PATCH 14/23] retest Change-Id: I1c71e0ad97bac4298caabdc980155f0e1ec428ea --- .../org/apache/spark/sql/connector/InMemoryPartitionTable.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala index 8e31088316912..b435817b8beab 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala @@ -45,7 +45,7 @@ class InMemoryPartitionTable( def partitionSchema: StructType = { val partitionColumnNames = partitioning.toSeq.asPartitionColumns - new StructType(schema.filter(partitionColumnNames.contains).toArray) + new StructType(schema.filter(p => partitionColumnNames.contains(p.name)).toArray) } def createPartition( From ad84016bde2aefb68912f4c4dd099e459a3cbf84 Mon Sep 17 00:00:00 2001 From: stczwd Date: Thu, 6 Aug 2020 19:39:53 +0800 Subject: [PATCH 15/23] add SupportsAtomicPartitionManagement API support Change-Id: I7b782bfafe77e62b842fd6533347d6c705c62033 --- .../SupportsAtomicPartitionManagement.java | 87 +++++++++ ....java => SupportsPartitionManagement.java} | 13 +- .../analysis/AlreadyExistException.scala | 14 +- .../analysis/NoSuchItemException.scala | 20 +- .../InMemoryAtomicPartitionTable.scala | 70 +++++++ .../connector/InMemoryPartitionTable.scala | 10 +- ...pportsAtomicPartitionManagementSuite.scala | 181 ++++++++++++++++++ ...=> SupportsPartitionManagementSuite.scala} | 2 +- 8 files changed, 381 insertions(+), 16 deletions(-) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java rename sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/{SupportsPartitions.java => SupportsPartitionManagement.java} (90%) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala rename sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/{SupportsPartitionsSuite.scala => SupportsPartitionManagementSuite.scala} (98%) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java new file mode 100644 index 0000000000000..3a40af11ff44c --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.Map; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionsException; +import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException; + +/** + * An atomic partition interface of {@link Table} to operate multiple partitions atomically. + *

+ * These APIs are used to modify table partition or partition metadata, + * they will change the table data as well. + * ${@link #createPartitions}: + * add an array of partitions and any data that their location contains to the table + * ${@link #dropPartitions}: + * remove an array of partitions and any data they contains from the table + * ${@link #replacePartitionMetadatas}: + * point an array of partitions to new locations, which will swap location's data for the other + * + * @since 3.1.0 + */ +@Experimental +public interface SupportsAtomicPartitionManagement extends SupportsPartitionManagement { + + /** + * Create an array of partitions atomically in table. + *

+ * If any partition already exists, + * the operation of createPartitions need to be safely rolled back. + * + * @param idents an array of new partition identifiers + * @param properties the metadata of the partitions + * @throws PartitionsAlreadyExistException If any partition already exists for the identifier + * @throws UnsupportedOperationException If partition property is not supported + */ + void createPartitions( + InternalRow[] idents, + Map[] properties) + throws PartitionsAlreadyExistException, UnsupportedOperationException; + + /** + * Drop an array of partitions atomically from table. + *

+ * If any partition doesn't exists, + * the operation of dropPartitions need to be safely rolled back. + * + * @param idents an array of partition identifiers + * @throws NoSuchPartitionsException If the partition identifiers to drop doesn't exist + */ + void dropPartitions( + InternalRow[] idents) throws NoSuchPartitionsException; + + /** + * Replace the partition metadata of the existing partitions atomically. + *

+ * If any partition doesn't exists, + * the operation of replacePartitionMetadatas need to be safely rolled back. + * + * @param idents the partition identifier of the existing partitions + * @param properties the new metadata of the partitions + * @throws NoSuchPartitionsException If the partition identifiers to alter doesn't exist + * @throws UnsupportedOperationException If partition property is not supported + */ + void replacePartitionMetadatas( + InternalRow[] idents, + Map[] properties) + throws NoSuchPartitionsException, UnsupportedOperationException; +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java similarity index 90% rename from sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java rename to sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java index e61bcc1411c55..4e52ce53fa7e7 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.spark.sql.connector.catalog; import java.util.Map; @@ -41,7 +42,7 @@ * @since 3.1.0 */ @Experimental -public interface SupportsPartitions extends Table { +public interface SupportsPartitionManagement extends Table { /** * @return the partition schema of table @@ -69,6 +70,16 @@ void createPartition( */ boolean dropPartition(InternalRow ident); + /** + * Test whether a partition exists using an {@link Identifier identifier} from the table. + * + * @param ident a partition identifier + * @return true if the partition exists, false otherwise + */ + default boolean partitionExists(InternalRow ident) { + return listPartitionIdentifiers(ident).length > 0; + } + /** * Replace the partition metadata of the existing partition. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala index e03a0cf056f5c..bfc3b3d0ac966 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala @@ -62,10 +62,18 @@ class PartitionAlreadyExistsException(message: String) extends AnalysisException } } -class PartitionsAlreadyExistException(db: String, table: String, specs: Seq[TablePartitionSpec]) - extends AnalysisException( - s"The following partitions already exists in table '$table' database '$db':\n" +class PartitionsAlreadyExistException(message: String) extends AnalysisException(message) { + def this(db: String, table: String, specs: Seq[TablePartitionSpec]) { + this(s"The following partitions already exists in table '$table' database '$db':\n" + specs.mkString("\n===\n")) + } + + def this(tableName: String, partitionIdents: Seq[InternalRow], partitionSchema: StructType) = { + this(s"The following partitions already exists in table $tableName:" + + partitionIdents.map(_.toSeq(partitionSchema).zip(partitionSchema.map(_.name)) + .map( kv => s"${kv._1} -> ${kv._2}").mkString(",")).mkString("\n===\n")) + } +} class FunctionAlreadyExistsException(db: String, func: String) extends AnalysisException(s"Function '$func' already exists in database '$db'") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala index 5d0fa9a9cfd8e..88be441d808db 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala @@ -54,9 +54,9 @@ class NoSuchPartitionException(message: String) extends AnalysisException(messag } def this(tableName: String, partitionIdent: InternalRow, partitionSchema: StructType) = { - this(s"Partition not found in table $tableName: " + - s"${partitionIdent.toSeq(partitionSchema).zip(partitionSchema.map(_.name)) - .map( kv => s"${kv._1} -> ${kv._2}").mkString(",")}") + this(s"Partition not found in table $tableName: " + + partitionIdent.toSeq(partitionSchema).zip(partitionSchema.map(_.name)) + .map( kv => s"${kv._1} -> ${kv._2}").mkString(",")) } } @@ -68,10 +68,18 @@ class NoSuchFunctionException(db: String, func: String, cause: Option[Throwable] s"Undefined function: '$func'. This function is neither a registered temporary function nor " + s"a permanent function registered in the database '$db'.", cause = cause) -class NoSuchPartitionsException(db: String, table: String, specs: Seq[TablePartitionSpec]) - extends AnalysisException( - s"The following partitions not found in table '$table' database '$db':\n" +class NoSuchPartitionsException(message: String) extends AnalysisException(message) { + def this(db: String, table: String, specs: Seq[TablePartitionSpec]) = { + this(s"The following partitions not found in table '$table' database '$db':\n" + specs.mkString("\n===\n")) + } + + def this(tableName: String, partitionIdents: Seq[InternalRow], partitionSchema: StructType) = { + this(s"The following partitions not found in table $tableName: " + + partitionIdents.map(_.toSeq(partitionSchema).zip(partitionSchema.map(_.name)) + .map( kv => s"${kv._1} -> ${kv._2}").mkString(",")).mkString("\n===\n")) + } +} class NoSuchTempFunctionException(func: String) extends AnalysisException(s"Temporary function '$func' not found") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala new file mode 100644 index 0000000000000..083e9e281c3f5 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionsException, PartitionsAlreadyExistException} +import org.apache.spark.sql.connector.catalog.SupportsAtomicPartitionManagement +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.types.StructType + +/** + * This class is used to test SupportsAtomicPartitionManagement API. + */ +class InMemoryAtomicPartitionTable ( + name: String, + schema: StructType, + partitioning: Array[Transform], + properties: util.Map[String, String]) + extends InMemoryPartitionTable(name, schema, partitioning, properties) + with SupportsAtomicPartitionManagement { + + override def createPartitions( + idents: Array[InternalRow], + properties: Array[util.Map[String, String]]): Unit = { + if (idents.exists(partitionExists)) { + throw new PartitionsAlreadyExistException( + name, idents.filter(partitionExists), partitionSchema) + } + idents.zip(properties).foreach { case (ident, property) => + createPartition(ident, property) + } + } + + override def dropPartitions(idents: Array[InternalRow]): Unit = { + if (!idents.forall(partitionExists)) { + throw new NoSuchPartitionsException( + name, idents.filterNot(partitionExists), partitionSchema) + } + idents.foreach(dropPartition) + } + + override def replacePartitionMetadatas( + idents: Array[InternalRow], + properties: Array[util.Map[String, String]]): Unit = { + if (!idents.forall(partitionExists)) { + throw new NoSuchPartitionsException( + name, idents.filterNot(partitionExists), partitionSchema) + } + idents.zip(properties).foreach { case (ident, property) => + replacePartitionMetadata(ident, property) + } + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala index b435817b8beab..c76882d2e919b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala @@ -24,20 +24,19 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionAlreadyExistsException} -import org.apache.spark.sql.connector.catalog.SupportsPartitions +import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.types.StructType - /** - * This class is used to test SupportsPartitions API. + * This class is used to test SupportsPartitionManagement API. */ class InMemoryPartitionTable( name: String, schema: StructType, partitioning: Array[Transform], properties: util.Map[String, String]) - extends InMemoryTable(name, schema, partitioning, properties) with SupportsPartitions { + extends InMemoryTable(name, schema, partitioning, properties) with SupportsPartitionManagement { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ private val memoryTablePartitions: util.Map[InternalRow, util.Map[String, String]] = @@ -91,5 +90,6 @@ class InMemoryPartitionTable( .filter(_.toSeq(partitionSchema).startsWith(prefixPart)).toArray } - def partitionExists(ident: InternalRow): Boolean = memoryTablePartitions.containsKey(ident) + override def partitionExists(ident: InternalRow): Boolean = + memoryTablePartitions.containsKey(ident) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala new file mode 100644 index 0000000000000..0d6c33948a86f --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionsException, PartitionsAlreadyExistException} +import org.apache.spark.sql.connector.{InMemoryAtomicPartitionTable, InMemoryTableCatalog} +import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference} +import org.apache.spark.sql.types.{IntegerType, StringType, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class SupportsAtomicPartitionManagementSuite extends SparkFunSuite { + + private val ident: Identifier = Identifier.of(Array("ns"), "test_table") + + def ref(name: String): NamedReference = LogicalExpressions.parseReference(name) + + private val catalog: InMemoryTableCatalog = { + val newCatalog = new InMemoryTableCatalog + newCatalog.initialize("test", CaseInsensitiveStringMap.empty()) + newCatalog.createTable( + ident, + new StructType() + .add("id", IntegerType) + .add("data", StringType) + .add("dt", StringType), + Array(LogicalExpressions.identity(ref("dt"))), + util.Collections.emptyMap[String, String]) + newCatalog + } + + test("createPartitions") { + val table = catalog.loadTable(ident) + val partTable = new InMemoryAtomicPartitionTable( + table.name(), table.schema(), table.partitioning(), table.properties()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + + val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4")) + partTable.createPartitions( + partIdents, + Array(new util.HashMap[String, String](), new util.HashMap[String, String]())) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty) + assert(partTable.partitionExists(InternalRow.apply("3"))) + assert(partTable.partitionExists(InternalRow.apply("4"))) + + partTable.dropPartition(InternalRow.apply("3")) + partTable.dropPartition(InternalRow.apply("4")) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + } + + test("createPartitions failed if partition already exists") { + val table = catalog.loadTable(ident) + val partTable = new InMemoryAtomicPartitionTable( + table.name(), table.schema(), table.partitioning(), table.properties()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + + val partIdent = InternalRow.apply("4") + partTable.createPartition(partIdent, new util.HashMap[String, String]()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty) + assert(partTable.partitionExists(partIdent)) + + val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4")) + assertThrows[PartitionsAlreadyExistException]( + partTable.createPartitions( + partIdents, + Array(new util.HashMap[String, String](), new util.HashMap[String, String]()))) + assert(!partTable.partitionExists(InternalRow.apply("3"))) + + partTable.dropPartition(partIdent) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + } + + test("dropPartitions") { + val table = catalog.loadTable(ident) + val partTable = new InMemoryAtomicPartitionTable( + table.name(), table.schema(), table.partitioning(), table.properties()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + + val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4")) + partTable.createPartitions( + partIdents, + Array(new util.HashMap[String, String](), new util.HashMap[String, String]())) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty) + assert(partTable.partitionExists(InternalRow.apply("3"))) + assert(partTable.partitionExists(InternalRow.apply("4"))) + + partTable.dropPartitions(partIdents) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + } + + test("dropPartitions failed if partition not exists") { + val table = catalog.loadTable(ident) + val partTable = new InMemoryAtomicPartitionTable( + table.name(), table.schema(), table.partitioning(), table.properties()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + + val partIdent = InternalRow.apply("4") + partTable.createPartition(partIdent, new util.HashMap[String, String]()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).length == 1) + + val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4")) + assertThrows[NoSuchPartitionsException]( + partTable.dropPartitions(partIdents)) + assert(partTable.partitionExists(partIdent)) + + partTable.dropPartition(partIdent) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + } + + test("replacePartitionMetadatas") { + val table = catalog.loadTable(ident) + val partTable = new InMemoryAtomicPartitionTable( + table.name(), table.schema(), table.partitioning(), table.properties()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + + val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4")) + partTable.createPartitions( + partIdents, + Array(new util.HashMap[String, String](), new util.HashMap[String, String]())) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty) + + partTable.replacePartitionMetadatas( + partIdents, + Array(Map("paramKey" -> "paramValue").asJava, Map("paramKey1" -> "paramValue1").asJava)) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty) + assert(partTable.partitionExists(InternalRow.apply("3"))) + assert(!partTable.loadPartitionMetadata(InternalRow.apply("3")).isEmpty) + assert(partTable.loadPartitionMetadata(InternalRow.apply("3")).get("paramKey") == "paramValue") + assert(partTable.partitionExists(InternalRow.apply("4"))) + assert(!partTable.loadPartitionMetadata(InternalRow.apply("4")).isEmpty) + assert( + partTable.loadPartitionMetadata(InternalRow.apply("4")).get("paramKey1") == "paramValue1") + + partTable.dropPartitions(partIdents) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + } + + test("replacePartitionMetadatas failed if partition not exists") { + val table = catalog.loadTable(ident) + val partTable = new InMemoryAtomicPartitionTable( + table.name(), table.schema(), table.partitioning(), table.properties()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + + val partIdent = InternalRow.apply("4") + partTable.createPartition(partIdent, new util.HashMap[String, String]()) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty) + + val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4")) + assertThrows[NoSuchPartitionsException]( + partTable.replacePartitionMetadatas( + partIdents, + Array(Map("paramKey" -> "paramValue").asJava, Map("paramKey1" -> "paramValue1").asJava))) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty) + assert(partTable.partitionExists(partIdent)) + assert(partTable.loadPartitionMetadata(partIdent).isEmpty) + assert(!partTable.partitionExists(InternalRow.apply("3"))) + + partTable.dropPartition(partIdent) + assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala similarity index 98% rename from sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionsSuite.scala rename to sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala index 91cf8718e1249..e8e28e3422f27 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedRefe import org.apache.spark.sql.types.{IntegerType, StringType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap -class SupportsPartitionsSuite extends SparkFunSuite { +class SupportsPartitionManagementSuite extends SparkFunSuite { private val ident: Identifier = Identifier.of(Array("ns"), "test_table") From b570496a5bcec5ebedec63fa3acc3066af3d7824 Mon Sep 17 00:00:00 2001 From: stczwd Date: Mon, 10 Aug 2020 11:00:27 +0800 Subject: [PATCH 16/23] retest github action Change-Id: Icf56c7c4ed461a2227ae2195425c36cc7cfbec7c --- .../connector/catalog/SupportsAtomicPartitionManagement.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java index 3a40af11ff44c..34c943d02c8ab 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java @@ -64,7 +64,7 @@ void createPartitions( * the operation of dropPartitions need to be safely rolled back. * * @param idents an array of partition identifiers - * @throws NoSuchPartitionsException If the partition identifiers to drop doesn't exist + * @throws NoSuchPartitionsException If any partition identifier to drop doesn't exist */ void dropPartitions( InternalRow[] idents) throws NoSuchPartitionsException; @@ -77,7 +77,7 @@ void dropPartitions( * * @param idents the partition identifier of the existing partitions * @param properties the new metadata of the partitions - * @throws NoSuchPartitionsException If the partition identifiers to alter doesn't exist + * @throws NoSuchPartitionsException If any partition identifier to alter doesn't exist * @throws UnsupportedOperationException If partition property is not supported */ void replacePartitionMetadatas( From b3a6e2b73afbf07de459cd732db6d7f949955ccb Mon Sep 17 00:00:00 2001 From: stczwd Date: Tue, 11 Aug 2020 21:33:54 +0800 Subject: [PATCH 17/23] change comments and add default API in SupportsAtomicPartitionManagement Change-Id: I4870b050b2979991d039407e7c99a638b2eb8cd0 --- .../SupportsAtomicPartitionManagement.java | 40 ++++++++++++++++++- .../catalog/SupportsPartitionManagement.java | 2 +- 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java index 34c943d02c8ab..97a79f9e001cc 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java @@ -21,7 +21,9 @@ import org.apache.spark.annotation.Experimental; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException; import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionsException; +import org.apache.spark.sql.catalyst.analysis.PartitionAlreadyExistsException; import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException; /** @@ -30,9 +32,9 @@ * These APIs are used to modify table partition or partition metadata, * they will change the table data as well. * ${@link #createPartitions}: - * add an array of partitions and any data that their location contains to the table + * add an array of partitions and any data they contain to the table * ${@link #dropPartitions}: - * remove an array of partitions and any data they contains from the table + * remove an array of partitions and any data they contain from the table * ${@link #replacePartitionMetadatas}: * point an array of partitions to new locations, which will swap location's data for the other * @@ -41,6 +43,40 @@ @Experimental public interface SupportsAtomicPartitionManagement extends SupportsPartitionManagement { + @Override + default void createPartition( + InternalRow ident, + Map properties) + throws PartitionAlreadyExistsException, UnsupportedOperationException { + try { + createPartitions(new InternalRow[]{ident}, new Map[]{properties}); + } catch (PartitionsAlreadyExistException e) { + throw new PartitionAlreadyExistsException(e.getMessage()); + } + } + + @Override + default boolean dropPartition(InternalRow ident) { + try { + dropPartitions(new InternalRow[]{ident}); + return true; + } catch (NoSuchPartitionsException e) { + return false; + } + } + + @Override + default void replacePartitionMetadata( + InternalRow ident, + Map properties) + throws NoSuchPartitionException, UnsupportedOperationException { + try { + replacePartitionMetadatas(new InternalRow[]{ident}, new Map[]{properties}); + } catch (NoSuchPartitionsException e) { + throw new NoSuchPartitionException(e.getMessage()); + } + } + /** * Create an array of partitions atomically in table. *

diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java index 4e52ce53fa7e7..ff7cee18556f1 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java @@ -33,7 +33,7 @@ * These APIs are used to modify table partition identifier or partition metadata. * In some cases, they will change the table data as well. * ${@link #createPartition}: - * add a partition and any data that its location contains to the table + * add a partition and any data it contains to the table * ${@link #dropPartition}: * remove a partition and any data it contains from the table * ${@link #replacePartitionMetadata}: From 279cea6553cb07d54103733be6d58ab0e8c1da7a Mon Sep 17 00:00:00 2001 From: stczwd Date: Tue, 11 Aug 2020 22:04:58 +0800 Subject: [PATCH 18/23] add override api in InMemoryAtomicPartitionTable Change-Id: I0c3c6c44c3f7a6187159c22aa91c7c2de204acee --- .../connector/InMemoryAtomicPartitionTable.scala | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala index 083e9e281c3f5..5361b4ff45709 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala @@ -36,6 +36,19 @@ class InMemoryAtomicPartitionTable ( extends InMemoryPartitionTable(name, schema, partitioning, properties) with SupportsAtomicPartitionManagement { + override def createPartition( + ident: InternalRow, + properties: util.Map[String, String]): Unit = + super.createPartition(ident, properties) + + override def dropPartition(ident: InternalRow): Boolean = + super.dropPartition(ident) + + override def replacePartitionMetadata( + ident: InternalRow, + properties: util.Map[String, String]): Unit = + super.replacePartitionMetadata(ident, properties) + override def createPartitions( idents: Array[InternalRow], properties: Array[util.Map[String, String]]): Unit = { From 4bf97113834666c5505d4a550cca77969beeaaed Mon Sep 17 00:00:00 2001 From: stczwd Date: Tue, 11 Aug 2020 23:25:17 +0800 Subject: [PATCH 19/23] fix suites Change-Id: I902fe987e6685aa51386bda4c65e19998f317ee8 --- .../InMemoryAtomicPartitionTable.scala | 30 ++++++++++++++----- .../connector/InMemoryPartitionTable.scala | 2 +- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala index 5361b4ff45709..eb08bf0a99a17 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.connector import java.util import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionsException, PartitionsAlreadyExistException} +import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, NoSuchPartitionsException, PartitionAlreadyExistsException, PartitionsAlreadyExistException} import org.apache.spark.sql.connector.catalog.SupportsAtomicPartitionManagement import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.types.StructType @@ -38,16 +38,32 @@ class InMemoryAtomicPartitionTable ( override def createPartition( ident: InternalRow, - properties: util.Map[String, String]): Unit = - super.createPartition(ident, properties) + properties: util.Map[String, String]): Unit = { + if (memoryTablePartitions.containsKey(ident)) { + throw new PartitionAlreadyExistsException(name, ident, partitionSchema) + } else { + memoryTablePartitions.put(ident, properties) + } + } - override def dropPartition(ident: InternalRow): Boolean = - super.dropPartition(ident) + override def dropPartition(ident: InternalRow): Boolean = { + if (memoryTablePartitions.containsKey(ident)) { + memoryTablePartitions.remove(ident) + true + } else { + false + } + } override def replacePartitionMetadata( ident: InternalRow, - properties: util.Map[String, String]): Unit = - super.replacePartitionMetadata(ident, properties) + properties: util.Map[String, String]): Unit = { + if (memoryTablePartitions.containsKey(ident)) { + memoryTablePartitions.put(ident, properties) + } else { + throw new NoSuchPartitionException(name, ident, partitionSchema) + } + } override def createPartitions( idents: Array[InternalRow], diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala index c76882d2e919b..1c96bdf3afa20 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala @@ -39,7 +39,7 @@ class InMemoryPartitionTable( extends InMemoryTable(name, schema, partitioning, properties) with SupportsPartitionManagement { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ - private val memoryTablePartitions: util.Map[InternalRow, util.Map[String, String]] = + protected val memoryTablePartitions: util.Map[InternalRow, util.Map[String, String]] = new ConcurrentHashMap[InternalRow, util.Map[String, String]]() def partitionSchema: StructType = { From c96e0fc4377adb3435848acddf0cd0e8f44ab3e4 Mon Sep 17 00:00:00 2001 From: stczwd Date: Wed, 12 Aug 2020 00:27:56 +0800 Subject: [PATCH 20/23] remove replacePartitionMetadatas Change-Id: I14207ee891274be86c591c68abd9e66666c15d03 --- .../SupportsAtomicPartitionManagement.java | 31 ----------- .../InMemoryAtomicPartitionTable.scala | 24 +-------- ...pportsAtomicPartitionManagementSuite.scala | 54 ------------------- 3 files changed, 1 insertion(+), 108 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java index 97a79f9e001cc..67be086554069 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java @@ -21,7 +21,6 @@ import org.apache.spark.annotation.Experimental; import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException; import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionsException; import org.apache.spark.sql.catalyst.analysis.PartitionAlreadyExistsException; import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException; @@ -35,8 +34,6 @@ * add an array of partitions and any data they contain to the table * ${@link #dropPartitions}: * remove an array of partitions and any data they contain from the table - * ${@link #replacePartitionMetadatas}: - * point an array of partitions to new locations, which will swap location's data for the other * * @since 3.1.0 */ @@ -65,18 +62,6 @@ default boolean dropPartition(InternalRow ident) { } } - @Override - default void replacePartitionMetadata( - InternalRow ident, - Map properties) - throws NoSuchPartitionException, UnsupportedOperationException { - try { - replacePartitionMetadatas(new InternalRow[]{ident}, new Map[]{properties}); - } catch (NoSuchPartitionsException e) { - throw new NoSuchPartitionException(e.getMessage()); - } - } - /** * Create an array of partitions atomically in table. *

@@ -104,20 +89,4 @@ void createPartitions( */ void dropPartitions( InternalRow[] idents) throws NoSuchPartitionsException; - - /** - * Replace the partition metadata of the existing partitions atomically. - *

- * If any partition doesn't exists, - * the operation of replacePartitionMetadatas need to be safely rolled back. - * - * @param idents the partition identifier of the existing partitions - * @param properties the new metadata of the partitions - * @throws NoSuchPartitionsException If any partition identifier to alter doesn't exist - * @throws UnsupportedOperationException If partition property is not supported - */ - void replacePartitionMetadatas( - InternalRow[] idents, - Map[] properties) - throws NoSuchPartitionsException, UnsupportedOperationException; } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala index eb08bf0a99a17..17f7eba812e7b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.connector import java.util import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, NoSuchPartitionsException, PartitionAlreadyExistsException, PartitionsAlreadyExistException} +import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionsException, PartitionAlreadyExistsException, PartitionsAlreadyExistException} import org.apache.spark.sql.connector.catalog.SupportsAtomicPartitionManagement import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.types.StructType @@ -55,16 +55,6 @@ class InMemoryAtomicPartitionTable ( } } - override def replacePartitionMetadata( - ident: InternalRow, - properties: util.Map[String, String]): Unit = { - if (memoryTablePartitions.containsKey(ident)) { - memoryTablePartitions.put(ident, properties) - } else { - throw new NoSuchPartitionException(name, ident, partitionSchema) - } - } - override def createPartitions( idents: Array[InternalRow], properties: Array[util.Map[String, String]]): Unit = { @@ -84,16 +74,4 @@ class InMemoryAtomicPartitionTable ( } idents.foreach(dropPartition) } - - override def replacePartitionMetadatas( - idents: Array[InternalRow], - properties: Array[util.Map[String, String]]): Unit = { - if (!idents.forall(partitionExists)) { - throw new NoSuchPartitionsException( - name, idents.filterNot(partitionExists), partitionSchema) - } - idents.zip(properties).foreach { case (ident, property) => - replacePartitionMetadata(ident, property) - } - } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala index 0d6c33948a86f..392c8cd15280c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.connector.catalog import java.util -import scala.collection.JavaConverters._ - import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionsException, PartitionsAlreadyExistException} @@ -126,56 +124,4 @@ class SupportsAtomicPartitionManagementSuite extends SparkFunSuite { partTable.dropPartition(partIdent) assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) } - - test("replacePartitionMetadatas") { - val table = catalog.loadTable(ident) - val partTable = new InMemoryAtomicPartitionTable( - table.name(), table.schema(), table.partitioning(), table.properties()) - assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) - - val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4")) - partTable.createPartitions( - partIdents, - Array(new util.HashMap[String, String](), new util.HashMap[String, String]())) - assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty) - - partTable.replacePartitionMetadatas( - partIdents, - Array(Map("paramKey" -> "paramValue").asJava, Map("paramKey1" -> "paramValue1").asJava)) - assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty) - assert(partTable.partitionExists(InternalRow.apply("3"))) - assert(!partTable.loadPartitionMetadata(InternalRow.apply("3")).isEmpty) - assert(partTable.loadPartitionMetadata(InternalRow.apply("3")).get("paramKey") == "paramValue") - assert(partTable.partitionExists(InternalRow.apply("4"))) - assert(!partTable.loadPartitionMetadata(InternalRow.apply("4")).isEmpty) - assert( - partTable.loadPartitionMetadata(InternalRow.apply("4")).get("paramKey1") == "paramValue1") - - partTable.dropPartitions(partIdents) - assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) - } - - test("replacePartitionMetadatas failed if partition not exists") { - val table = catalog.loadTable(ident) - val partTable = new InMemoryAtomicPartitionTable( - table.name(), table.schema(), table.partitioning(), table.properties()) - assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) - - val partIdent = InternalRow.apply("4") - partTable.createPartition(partIdent, new util.HashMap[String, String]()) - assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty) - - val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4")) - assertThrows[NoSuchPartitionsException]( - partTable.replacePartitionMetadatas( - partIdents, - Array(Map("paramKey" -> "paramValue").asJava, Map("paramKey1" -> "paramValue1").asJava))) - assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty) - assert(partTable.partitionExists(partIdent)) - assert(partTable.loadPartitionMetadata(partIdent).isEmpty) - assert(!partTable.partitionExists(InternalRow.apply("3"))) - - partTable.dropPartition(partIdent) - assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) - } } From 4686a2416bf5c1e1f5d85e300e7122c5ab22fbdb Mon Sep 17 00:00:00 2001 From: stczwd Date: Wed, 12 Aug 2020 20:13:00 +0800 Subject: [PATCH 21/23] change dropPartition returns Change-Id: Id45c2cfd3acbbd6fbcb88d0ac1452fc8aa4c19fa --- .../catalog/SupportsAtomicPartitionManagement.java | 13 +++---------- .../catalog/SupportsPartitionManagement.java | 4 +++- .../connector/InMemoryAtomicPartitionTable.scala | 9 ++++----- .../SupportsAtomicPartitionManagementSuite.scala | 5 ++--- 4 files changed, 12 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java index 67be086554069..754203125cdc2 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java @@ -21,7 +21,6 @@ import org.apache.spark.annotation.Experimental; import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionsException; import org.apache.spark.sql.catalyst.analysis.PartitionAlreadyExistsException; import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException; @@ -54,12 +53,7 @@ default void createPartition( @Override default boolean dropPartition(InternalRow ident) { - try { - dropPartitions(new InternalRow[]{ident}); - return true; - } catch (NoSuchPartitionsException e) { - return false; - } + return dropPartitions(new InternalRow[]{ident}); } /** @@ -85,8 +79,7 @@ void createPartitions( * the operation of dropPartitions need to be safely rolled back. * * @param idents an array of partition identifiers - * @throws NoSuchPartitionsException If any partition identifier to drop doesn't exist + * @return true if partitions were deleted, false if any partition not exists */ - void dropPartitions( - InternalRow[] idents) throws NoSuchPartitionsException; + boolean dropPartitions(InternalRow[] idents); } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java index ff7cee18556f1..6d49bb50ef80d 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java @@ -45,6 +45,8 @@ public interface SupportsPartitionManagement extends Table { /** + * Get the partition schema of table, + * this must be consistent with ${@link Table#partitioning()} * @return the partition schema of table */ StructType partitionSchema(); @@ -71,7 +73,7 @@ void createPartition( boolean dropPartition(InternalRow ident); /** - * Test whether a partition exists using an {@link Identifier identifier} from the table. + * Test whether a partition exists using an {@link InternalRow ident} from the table. * * @param ident a partition identifier * @return true if the partition exists, false otherwise diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala index 17f7eba812e7b..c2a95cc3b8b07 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.connector import java.util import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionsException, PartitionAlreadyExistsException, PartitionsAlreadyExistException} +import org.apache.spark.sql.catalyst.analysis.{PartitionAlreadyExistsException, PartitionsAlreadyExistException} import org.apache.spark.sql.connector.catalog.SupportsAtomicPartitionManagement import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.types.StructType @@ -67,11 +67,10 @@ class InMemoryAtomicPartitionTable ( } } - override def dropPartitions(idents: Array[InternalRow]): Unit = { + override def dropPartitions(idents: Array[InternalRow]): Boolean = { if (!idents.forall(partitionExists)) { - throw new NoSuchPartitionsException( - name, idents.filterNot(partitionExists), partitionSchema) + return false; } - idents.foreach(dropPartition) + idents.forall(dropPartition) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala index 392c8cd15280c..6f7c30653110b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala @@ -21,7 +21,7 @@ import java.util import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionsException, PartitionsAlreadyExistException} +import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException import org.apache.spark.sql.connector.{InMemoryAtomicPartitionTable, InMemoryTableCatalog} import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference} import org.apache.spark.sql.types.{IntegerType, StringType, StructType} @@ -117,8 +117,7 @@ class SupportsAtomicPartitionManagementSuite extends SparkFunSuite { assert(partTable.listPartitionIdentifiers(InternalRow.empty).length == 1) val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4")) - assertThrows[NoSuchPartitionsException]( - partTable.dropPartitions(partIdents)) + assert(!partTable.dropPartitions(partIdents)) assert(partTable.partitionExists(partIdent)) partTable.dropPartition(partIdent) From 4f1bff391473bfc5f97348388e7d0aa3a998e699 Mon Sep 17 00:00:00 2001 From: stczwd Date: Wed, 12 Aug 2020 23:03:45 +0800 Subject: [PATCH 22/23] restart git actions Change-Id: Icbe0edf5eec106dd7dff5ce3463cff82ec0310a0 --- .../sql/connector/catalog/SupportsPartitionManagement.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java index 6d49bb50ef80d..efa07eeded19a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java @@ -46,7 +46,8 @@ public interface SupportsPartitionManagement extends Table { /** * Get the partition schema of table, - * this must be consistent with ${@link Table#partitioning()} + * this must be consistent with ${@link Table#partitioning()}. + * * @return the partition schema of table */ StructType partitionSchema(); From bfd17d477cd61eaac7a0c328bc1e3faa9fad5694 Mon Sep 17 00:00:00 2001 From: stczwd Date: Wed, 12 Aug 2020 23:23:01 +0800 Subject: [PATCH 23/23] restart git actions Change-Id: I2d07560c289b6b90092f778a4763a578f938c887 --- .../spark/sql/connector/catalog/SupportsPartitionManagement.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java index efa07eeded19a..446ea1463309f 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java @@ -47,7 +47,6 @@ public interface SupportsPartitionManagement extends Table { /** * Get the partition schema of table, * this must be consistent with ${@link Table#partitioning()}. - * * @return the partition schema of table */ StructType partitionSchema();