-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-33509][SQL] List partition by names from a V2 table which supports partition management #30452
[SPARK-33509][SQL] List partition by names from a V2 table which supports partition management #30452
Changes from all commits
2d42120
d1cbc92
20d9121
75c4903
373e22e
3f20ee8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._ | |
|
||
import org.apache.spark.sql.catalyst.InternalRow | ||
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionAlreadyExistsException} | ||
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow | ||
import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement | ||
import org.apache.spark.sql.connector.expressions.Transform | ||
import org.apache.spark.sql.types.StructType | ||
|
@@ -96,4 +97,25 @@ class InMemoryPartitionTable( | |
override protected def addPartitionKey(key: Seq[Any]): Unit = { | ||
memoryTablePartitions.put(InternalRow.fromSeq(key), Map.empty[String, String].asJava) | ||
} | ||
|
||
override def listPartitionByNames( | ||
names: Array[String], | ||
ident: InternalRow): Array[InternalRow] = { | ||
assert(names.length == ident.numFields, | ||
s"Number of partition names (${names.length}) must be equal to " + | ||
s"the number of partition values (${ident.numFields}).") | ||
val schema = partitionSchema | ||
assert(names.forall(fieldName => schema.fieldNames.contains(fieldName)), | ||
s"Some partition names ${names.mkString("[", ", ", "]")} don't belong to " + | ||
s"the partition schema '${schema.sql}'.") | ||
val indexes = names.map(schema.fieldIndex) | ||
val dataTypes = names.map(schema(_).dataType) | ||
Comment on lines
+111
to
+112
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The names should be normalized after #30454, so, we shouldn't care of case sensitivity here. |
||
val currentRow = new GenericInternalRow(new Array[Any](names.length)) | ||
memoryTablePartitions.keySet().asScala.filter { key => | ||
for (i <- 0 until names.length) { | ||
currentRow.values(i) = key.get(indexes(i), dataTypes(i)) | ||
} | ||
currentRow == ident | ||
}.toArray | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters._ | |
|
||
import org.apache.spark.SparkFunSuite | ||
import org.apache.spark.sql.catalyst.InternalRow | ||
import org.apache.spark.sql.connector.{InMemoryPartitionTable, InMemoryTableCatalog} | ||
import org.apache.spark.sql.connector.{InMemoryPartitionTable, InMemoryPartitionTableCatalog, InMemoryTableCatalog} | ||
import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference} | ||
import org.apache.spark.sql.types.{IntegerType, StringType, StructType} | ||
import org.apache.spark.sql.util.CaseInsensitiveStringMap | ||
|
@@ -140,4 +140,45 @@ class SupportsPartitionManagementSuite extends SparkFunSuite { | |
partTable.dropPartition(partIdent1) | ||
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty) | ||
} | ||
|
||
test("listPartitionByNames") { | ||
val partCatalog = new InMemoryPartitionTableCatalog | ||
partCatalog.initialize("test", CaseInsensitiveStringMap.empty()) | ||
val table = partCatalog.createTable( | ||
ident, | ||
new StructType() | ||
.add("col0", IntegerType) | ||
.add("part0", IntegerType) | ||
.add("part1", StringType), | ||
Array(LogicalExpressions.identity(ref("part0")), LogicalExpressions.identity(ref("part1"))), | ||
util.Collections.emptyMap[String, String]) | ||
val partTable = table.asInstanceOf[InMemoryPartitionTable] | ||
|
||
Seq( | ||
InternalRow(0, "abc"), | ||
InternalRow(0, "def"), | ||
InternalRow(1, "abc")).foreach { partIdent => | ||
partTable.createPartition(partIdent, new util.HashMap[String, String]()) | ||
} | ||
|
||
Seq( | ||
(Array("part0", "part1"), InternalRow(0, "abc")) -> Set(InternalRow(0, "abc")), | ||
(Array("part0"), InternalRow(0)) -> Set(InternalRow(0, "abc"), InternalRow(0, "def")), | ||
(Array("part1"), InternalRow("abc")) -> Set(InternalRow(0, "abc"), InternalRow(1, "abc")), | ||
(Array.empty[String], InternalRow.empty) -> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a special case which allows to list all partitions. |
||
Set(InternalRow(0, "abc"), InternalRow(0, "def"), InternalRow(1, "abc")), | ||
(Array("part0", "part1"), InternalRow(3, "xyz")) -> Set(), | ||
(Array("part1"), InternalRow(3.14f)) -> Set() | ||
).foreach { case ((names, idents), expected) => | ||
assert(partTable.listPartitionByNames(names, idents).toSet === expected) | ||
} | ||
// Check invalid parameters | ||
Seq( | ||
(Array("part0", "part1"), InternalRow(0)), | ||
(Array("col0", "part1"), InternalRow(0, 1)), | ||
(Array("wrong"), InternalRow("invalid")) | ||
).foreach { case (names, idents) => | ||
intercept[AssertionError](partTable.listPartitionByNames(names, idents)) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is a user supposed to access to
Table
directly? Looks a bit odd thatlistPartitionByNames
interface is added but not used in the Spark internal side.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea we should remove the old API as it's not released yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method
listPartitionIdentifiers()
is used only inpartitionExists()
and in tests. Let me remove it and replace its usage bylistPartitionByNames()
.How about renaming
listPartitionByNames()
to justlistPartitions()
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon
listPartitionByNames()
will be used by V2 commands likeSHOW PARTITIONS
(in #30398) andSHOW TABLE EXTENDED
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Frankly speaking, I would remove
listPartitionIdentifiers()
separately as this requires unrelated changes to list partition by names.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the final API name should still be
listPartitionIdentifiers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove the old one in your next PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with this.