Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33569][SQL] Remove getting partitions by an identifier prefix. #30514

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.connector.catalog;

import java.util.Arrays;
import java.util.Map;

import org.apache.spark.annotation.Experimental;
Expand Down Expand Up @@ -79,7 +80,9 @@ void createPartition(
* @return true if the partition exists, false otherwise
*/
default boolean partitionExists(InternalRow ident) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR also changed this API. In the past, we coud use this API to check wether table had partitions

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the changes, you can do the same, right?

return listPartitionIdentifiers(ident).length > 0;
String[] partitionNames = partitionSchema().names();
String[] requiredNames = Arrays.copyOfRange(partitionNames, 0, ident.numFields());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we fail if the given ident is only a prefix? partitionExists sounds like it should be applied to a certain partition.

BTW which command needs it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It uses in tests a lot, and in:

I guess, we can rewrite the cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we always call it with a complete partition spec.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

anyway it's not related to this PR.

return listPartitionIdentifiers(requiredNames, ident).length > 0;
}

/**
Expand All @@ -105,20 +108,12 @@ void replacePartitionMetadata(
Map<String, String> loadPartitionMetadata(InternalRow ident)
throws UnsupportedOperationException;

/**
* List the identifiers of all partitions that have the ident prefix in a table.
*
* @param ident a prefix of partition identifier
* @return an array of Identifiers for the partitions
*/
InternalRow[] listPartitionIdentifiers(InternalRow ident);

/**
* List the identifiers of all partitions that match to the ident by names.
*
* @param names the names of partition values in the identifier.
* @param ident a partition identifier values.
* @return an array of Identifiers for the partitions
*/
InternalRow[] listPartitionByNames(String[] names, InternalRow ident);
InternalRow[] listPartitionIdentifiers(String[] names, InternalRow ident);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about listPartitionIdentifiers(StructType column, InternalRow ident)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StructType requires to set additional fields like nullable, metadata and etc. that could be not necessary to list partitions. For example, I use this method in https://github.com/apache/spark/pull/30398/files#diff-b8fcff1b4de2d6e3641609dc02db791af8c56c2660123fa2b4afd408597fb401R44 where names are coming from ResolvedPartitionSpec. With our proposal, I will have to extract StructType from the partition schema before calling listPartitionIdentifiers which is not convenient ( and necessary) from my point of view.

}
Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,14 @@ class InMemoryPartitionTable(
}
}

def listPartitionIdentifiers(ident: InternalRow): Array[InternalRow] = {
val prefixPartCols =
new StructType(partitionSchema.dropRight(partitionSchema.length - ident.numFields).toArray)
val prefixPart = ident.toSeq(prefixPartCols)
memoryTablePartitions.keySet().asScala
.filter(_.toSeq(partitionSchema).startsWith(prefixPart)).toArray
}

override def partitionExists(ident: InternalRow): Boolean =
memoryTablePartitions.containsKey(ident)

override protected def addPartitionKey(key: Seq[Any]): Unit = {
memoryTablePartitions.put(InternalRow.fromSeq(key), Map.empty[String, String].asJava)
}

override def listPartitionByNames(
override def listPartitionIdentifiers(
names: Array[String],
ident: InternalRow): Array[InternalRow] = {
assert(names.length == ident.numFields,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,34 +47,38 @@ class SupportsAtomicPartitionManagementSuite extends SparkFunSuite {
newCatalog
}

private def hasPartitions(table: SupportsPartitionManagement): Boolean = {
!table.listPartitionIdentifiers(Array.empty, InternalRow.empty).isEmpty
}

test("createPartitions") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryAtomicPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))

val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4"))
partTable.createPartitions(
partIdents,
Array(new util.HashMap[String, String](), new util.HashMap[String, String]()))
assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty)
assert(hasPartitions(partTable))
assert(partTable.partitionExists(InternalRow.apply("3")))
assert(partTable.partitionExists(InternalRow.apply("4")))

partTable.dropPartition(InternalRow.apply("3"))
partTable.dropPartition(InternalRow.apply("4"))
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))
}

test("createPartitions failed if partition already exists") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryAtomicPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))

val partIdent = InternalRow.apply("4")
partTable.createPartition(partIdent, new util.HashMap[String, String]())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty)
assert(hasPartitions(partTable))
assert(partTable.partitionExists(partIdent))

val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4"))
Expand All @@ -85,42 +89,42 @@ class SupportsAtomicPartitionManagementSuite extends SparkFunSuite {
assert(!partTable.partitionExists(InternalRow.apply("3")))

partTable.dropPartition(partIdent)
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))
}

test("dropPartitions") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryAtomicPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))

val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4"))
partTable.createPartitions(
partIdents,
Array(new util.HashMap[String, String](), new util.HashMap[String, String]()))
assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty)
assert(hasPartitions(partTable))
assert(partTable.partitionExists(InternalRow.apply("3")))
assert(partTable.partitionExists(InternalRow.apply("4")))

partTable.dropPartitions(partIdents)
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))
}

test("dropPartitions failed if partition not exists") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryAtomicPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))

val partIdent = InternalRow.apply("4")
partTable.createPartition(partIdent, new util.HashMap[String, String]())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).length == 1)
assert(partTable.listPartitionIdentifiers(Array.empty, InternalRow.empty).length == 1)

val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4"))
assert(!partTable.dropPartitions(partIdents))
assert(partTable.partitionExists(partIdent))

partTable.dropPartition(partIdent)
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,97 +48,101 @@ class SupportsPartitionManagementSuite extends SparkFunSuite {
newCatalog
}

private def hasPartitions(table: SupportsPartitionManagement): Boolean = {
!table.listPartitionIdentifiers(Array.empty, InternalRow.empty).isEmpty
}

test("createPartition") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))

val partIdent = InternalRow.apply("3")
partTable.createPartition(partIdent, new util.HashMap[String, String]())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty)
assert(hasPartitions(partTable))
assert(partTable.partitionExists(partIdent))

partTable.dropPartition(partIdent)
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))
}

test("dropPartition") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))

val partIdent = InternalRow.apply("3")
val partIdent1 = InternalRow.apply("4")
partTable.createPartition(partIdent, new util.HashMap[String, String]())
partTable.createPartition(partIdent1, new util.HashMap[String, String]())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).length == 2)
assert(partTable.listPartitionIdentifiers(Array.empty, InternalRow.empty).length == 2)

partTable.dropPartition(partIdent)
assert(partTable.listPartitionIdentifiers(InternalRow.empty).length == 1)
assert(partTable.listPartitionIdentifiers(Array.empty, InternalRow.empty).length == 1)
partTable.dropPartition(partIdent1)
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))
}

test("replacePartitionMetadata") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))

val partIdent = InternalRow.apply("3")
partTable.createPartition(partIdent, new util.HashMap[String, String]())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty)
assert(hasPartitions(partTable))
assert(partTable.partitionExists(partIdent))
assert(partTable.loadPartitionMetadata(partIdent).isEmpty)

partTable.replacePartitionMetadata(partIdent, Map("paramKey" -> "paramValue").asJava)
assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty)
assert(hasPartitions(partTable))
assert(partTable.partitionExists(partIdent))
assert(!partTable.loadPartitionMetadata(partIdent).isEmpty)
assert(partTable.loadPartitionMetadata(partIdent).get("paramKey") == "paramValue")

partTable.dropPartition(partIdent)
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))
}

test("loadPartitionMetadata") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))

val partIdent = InternalRow.apply("3")
partTable.createPartition(partIdent, Map("paramKey" -> "paramValue").asJava)
assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty)
assert(hasPartitions(partTable))
assert(partTable.partitionExists(partIdent))
assert(!partTable.loadPartitionMetadata(partIdent).isEmpty)
assert(partTable.loadPartitionMetadata(partIdent).get("paramKey") == "paramValue")

partTable.dropPartition(partIdent)
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))
}

test("listPartitionIdentifiers") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))

val partIdent = InternalRow.apply("3")
partTable.createPartition(partIdent, new util.HashMap[String, String]())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).length == 1)
assert(partTable.listPartitionIdentifiers(Array.empty, InternalRow.empty).length == 1)

val partIdent1 = InternalRow.apply("4")
partTable.createPartition(partIdent1, new util.HashMap[String, String]())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).length == 2)
assert(partTable.listPartitionIdentifiers(partIdent1).length == 1)
assert(partTable.listPartitionIdentifiers(Array.empty, InternalRow.empty).length == 2)
assert(partTable.listPartitionIdentifiers(Array("dt"), partIdent1).length == 1)

partTable.dropPartition(partIdent)
assert(partTable.listPartitionIdentifiers(InternalRow.empty).length == 1)
assert(partTable.listPartitionIdentifiers(Array.empty, InternalRow.empty).length == 1)
partTable.dropPartition(partIdent1)
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))
}

test("listPartitionByNames") {
Expand Down Expand Up @@ -170,15 +174,15 @@ class SupportsPartitionManagementSuite extends SparkFunSuite {
(Array("part0", "part1"), InternalRow(3, "xyz")) -> Set(),
(Array("part1"), InternalRow(3.14f)) -> Set()
).foreach { case ((names, idents), expected) =>
assert(partTable.listPartitionByNames(names, idents).toSet === expected)
assert(partTable.listPartitionIdentifiers(names, idents).toSet === expected)
}
// Check invalid parameters
Seq(
(Array("part0", "part1"), InternalRow(0)),
(Array("col0", "part1"), InternalRow(0, 1)),
(Array("wrong"), InternalRow("invalid"))
).foreach { case (names, idents) =>
intercept[AssertionError](partTable.listPartitionByNames(names, idents))
intercept[AssertionError](partTable.listPartitionIdentifiers(names, idents))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {
catalog("testpart").asTableCatalog.loadTable(Identifier.of(Array("ns1", "ns2"), "tbl"))
assert(!partTable.asPartitionable.partitionExists(InternalRow.fromSeq(Seq(1))))
assert(!partTable.asPartitionable.partitionExists(InternalRow.fromSeq(Seq(2))))
assert(partTable.asPartitionable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(
partTable.asPartitionable.listPartitionIdentifiers(Array.empty, InternalRow.empty).isEmpty)
}
}

Expand All @@ -161,7 +162,8 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {
spark.sql(s"ALTER TABLE $t DROP IF EXISTS PARTITION (id=1), PARTITION (id=2)")
assert(!partTable.asPartitionable.partitionExists(InternalRow.fromSeq(Seq(1))))
assert(!partTable.asPartitionable.partitionExists(InternalRow.fromSeq(Seq(2))))
assert(partTable.asPartitionable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(
partTable.asPartitionable.listPartitionIdentifiers(Array.empty, InternalRow.empty).isEmpty)
}
}

Expand Down