Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33509][SQL] List partition by names from a V2 table which supports partition management #30452

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,19 @@ Map<String, String> loadPartitionMetadata(InternalRow ident)
throws UnsupportedOperationException;

/**
* List the identifiers of all partitions that contains the ident in a table.
* List the identifiers of all partitions that have the ident prefix in a table.
*
* @param ident a prefix of partition identifier
* @return an array of Identifiers for the partitions
*/
InternalRow[] listPartitionIdentifiers(InternalRow ident);

/**
* List the identifiers of all partitions that match to the ident by names.
*
* @param names the names of partition values in the identifier.
* @param ident a partition identifier values.
* @return an array of Identifiers for the partitions
*/
InternalRow[] listPartitionByNames(String[] names, InternalRow ident);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is a user supposed to access to Table directly? Looks a bit odd that listPartitionByNames interface is added but not used in the Spark internal side.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea we should remove the old API as it's not released yet.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method listPartitionIdentifiers() is used only in partitionExists() and in tests. Let me remove it and replace its usage by listPartitionByNames().

How about renaming listPartitionByNames() to just listPartitions()?

Copy link
Member Author

@MaxGekk MaxGekk Nov 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks a bit odd that listPartitionByNames interface is added but not used in the Spark internal side.

@HyukjinKwon listPartitionByNames() will be used by V2 commands like SHOW PARTITIONS (in #30398) and SHOW TABLE EXTENDED.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea we should remove the old API as it's not released yet.

Frankly speaking, I would remove listPartitionIdentifiers() separately as this requires unrelated changes to list partition by names.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the final API name should still be listPartitionIdentifiers

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the old one in your next PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the final API name should still be listPartitionIdentifiers

Agree with this.

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionAlreadyExistsException}
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -96,4 +97,25 @@ class InMemoryPartitionTable(
override protected def addPartitionKey(key: Seq[Any]): Unit = {
memoryTablePartitions.put(InternalRow.fromSeq(key), Map.empty[String, String].asJava)
}

override def listPartitionByNames(
names: Array[String],
ident: InternalRow): Array[InternalRow] = {
assert(names.length == ident.numFields,
s"Number of partition names (${names.length}) must be equal to " +
s"the number of partition values (${ident.numFields}).")
val schema = partitionSchema
assert(names.forall(fieldName => schema.fieldNames.contains(fieldName)),
s"Some partition names ${names.mkString("[", ", ", "]")} don't belong to " +
s"the partition schema '${schema.sql}'.")
val indexes = names.map(schema.fieldIndex)
val dataTypes = names.map(schema(_).dataType)
Comment on lines +111 to +112
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The names should be normalized after #30454, so, we shouldn't care of case sensitivity here.

val currentRow = new GenericInternalRow(new Array[Any](names.length))
memoryTablePartitions.keySet().asScala.filter { key =>
for (i <- 0 until names.length) {
currentRow.values(i) = key.get(indexes(i), dataTypes(i))
}
currentRow == ident
}.toArray
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.{InMemoryPartitionTable, InMemoryTableCatalog}
import org.apache.spark.sql.connector.{InMemoryPartitionTable, InMemoryPartitionTableCatalog, InMemoryTableCatalog}
import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -140,4 +140,45 @@ class SupportsPartitionManagementSuite extends SparkFunSuite {
partTable.dropPartition(partIdent1)
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
}

test("listPartitionByNames") {
val partCatalog = new InMemoryPartitionTableCatalog
partCatalog.initialize("test", CaseInsensitiveStringMap.empty())
val table = partCatalog.createTable(
ident,
new StructType()
.add("col0", IntegerType)
.add("part0", IntegerType)
.add("part1", StringType),
Array(LogicalExpressions.identity(ref("part0")), LogicalExpressions.identity(ref("part1"))),
util.Collections.emptyMap[String, String])
val partTable = table.asInstanceOf[InMemoryPartitionTable]

Seq(
InternalRow(0, "abc"),
InternalRow(0, "def"),
InternalRow(1, "abc")).foreach { partIdent =>
partTable.createPartition(partIdent, new util.HashMap[String, String]())
}

Seq(
(Array("part0", "part1"), InternalRow(0, "abc")) -> Set(InternalRow(0, "abc")),
(Array("part0"), InternalRow(0)) -> Set(InternalRow(0, "abc"), InternalRow(0, "def")),
(Array("part1"), InternalRow("abc")) -> Set(InternalRow(0, "abc"), InternalRow(1, "abc")),
(Array.empty[String], InternalRow.empty) ->
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a special case which allows to list all partitions.

Set(InternalRow(0, "abc"), InternalRow(0, "def"), InternalRow(1, "abc")),
(Array("part0", "part1"), InternalRow(3, "xyz")) -> Set(),
(Array("part1"), InternalRow(3.14f)) -> Set()
).foreach { case ((names, idents), expected) =>
assert(partTable.listPartitionByNames(names, idents).toSet === expected)
}
// Check invalid parameters
Seq(
(Array("part0", "part1"), InternalRow(0)),
(Array("col0", "part1"), InternalRow(0, 1)),
(Array("wrong"), InternalRow("invalid"))
).foreach { case (names, idents) =>
intercept[AssertionError](partTable.listPartitionByNames(names, idents))
}
}
}