Skip to content

Commit

Permalink
[SPARK-33509][SQL] List partition by names from a V2 table which supp…
Browse files Browse the repository at this point in the history
…orts partition management

### What changes were proposed in this pull request?
1. Add new method `listPartitionByNames` to the `SupportsPartitionManagement` interface. It allows to list partitions by partition names and their values.
2. Implement new method in `InMemoryPartitionTable` which is used in DSv2 tests.

### Why are the changes needed?
Currently, the `SupportsPartitionManagement` interface exposes only `listPartitionIdentifiers` which allows to list partitions by partition values. And it requires to specify all values for partition schema fields in the prefix. This restriction does not allow to list partitions by some of partition names (not all of them).

For example, the table `tableA` is partitioned by two column `year` and `month`
```
CREATE TABLE tableA (price int, year int, month int)
USING _
partitioned by (year, month)
```
and has the following partitions:
```
PARTITION(year = 2015, month = 1)
PARTITION(year = 2015, month = 2)
PARTITION(year = 2016, month = 2)
PARTITION(year = 2016, month = 3)
```
If we want to list all partitions with `month = 2`, we have to specify `year` for **listPartitionIdentifiers()** which not always possible as we don't know all `year` values in advance. New method **listPartitionByNames()** allows to specify partition values only for `month`, and get two partitions:
```
PARTITION(year = 2015, month = 2)
PARTITION(year = 2016, month = 2)
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running the affected test suite `SupportsPartitionManagementSuite`.

Closes #30452 from MaxGekk/column-names-listPartitionIdentifiers.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
MaxGekk authored and cloud-fan committed Nov 25, 2020
1 parent 19f3b89 commit 2c5cc36
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,19 @@ Map<String, String> loadPartitionMetadata(InternalRow ident)
throws UnsupportedOperationException;

/**
* List the identifiers of all partitions that contains the ident in a table.
* List the identifiers of all partitions that have the ident prefix in a table.
*
* @param ident a prefix of partition identifier
* @return an array of Identifiers for the partitions
*/
InternalRow[] listPartitionIdentifiers(InternalRow ident);

/**
* List the identifiers of all partitions that match to the ident by names.
*
* @param names the names of partition values in the identifier.
* @param ident a partition identifier values.
* @return an array of Identifiers for the partitions
*/
InternalRow[] listPartitionByNames(String[] names, InternalRow ident);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionAlreadyExistsException}
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -96,4 +97,25 @@ class InMemoryPartitionTable(
override protected def addPartitionKey(key: Seq[Any]): Unit = {
memoryTablePartitions.put(InternalRow.fromSeq(key), Map.empty[String, String].asJava)
}

override def listPartitionByNames(
names: Array[String],
ident: InternalRow): Array[InternalRow] = {
assert(names.length == ident.numFields,
s"Number of partition names (${names.length}) must be equal to " +
s"the number of partition values (${ident.numFields}).")
val schema = partitionSchema
assert(names.forall(fieldName => schema.fieldNames.contains(fieldName)),
s"Some partition names ${names.mkString("[", ", ", "]")} don't belong to " +
s"the partition schema '${schema.sql}'.")
val indexes = names.map(schema.fieldIndex)
val dataTypes = names.map(schema(_).dataType)
val currentRow = new GenericInternalRow(new Array[Any](names.length))
memoryTablePartitions.keySet().asScala.filter { key =>
for (i <- 0 until names.length) {
currentRow.values(i) = key.get(indexes(i), dataTypes(i))
}
currentRow == ident
}.toArray
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.{InMemoryPartitionTable, InMemoryTableCatalog}
import org.apache.spark.sql.connector.{InMemoryPartitionTable, InMemoryPartitionTableCatalog, InMemoryTableCatalog}
import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -140,4 +140,45 @@ class SupportsPartitionManagementSuite extends SparkFunSuite {
partTable.dropPartition(partIdent1)
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
}

test("listPartitionByNames") {
val partCatalog = new InMemoryPartitionTableCatalog
partCatalog.initialize("test", CaseInsensitiveStringMap.empty())
val table = partCatalog.createTable(
ident,
new StructType()
.add("col0", IntegerType)
.add("part0", IntegerType)
.add("part1", StringType),
Array(LogicalExpressions.identity(ref("part0")), LogicalExpressions.identity(ref("part1"))),
util.Collections.emptyMap[String, String])
val partTable = table.asInstanceOf[InMemoryPartitionTable]

Seq(
InternalRow(0, "abc"),
InternalRow(0, "def"),
InternalRow(1, "abc")).foreach { partIdent =>
partTable.createPartition(partIdent, new util.HashMap[String, String]())
}

Seq(
(Array("part0", "part1"), InternalRow(0, "abc")) -> Set(InternalRow(0, "abc")),
(Array("part0"), InternalRow(0)) -> Set(InternalRow(0, "abc"), InternalRow(0, "def")),
(Array("part1"), InternalRow("abc")) -> Set(InternalRow(0, "abc"), InternalRow(1, "abc")),
(Array.empty[String], InternalRow.empty) ->
Set(InternalRow(0, "abc"), InternalRow(0, "def"), InternalRow(1, "abc")),
(Array("part0", "part1"), InternalRow(3, "xyz")) -> Set(),
(Array("part1"), InternalRow(3.14f)) -> Set()
).foreach { case ((names, idents), expected) =>
assert(partTable.listPartitionByNames(names, idents).toSet === expected)
}
// Check invalid parameters
Seq(
(Array("part0", "part1"), InternalRow(0)),
(Array("col0", "part1"), InternalRow(0, 1)),
(Array("wrong"), InternalRow("invalid"))
).foreach { case (names, idents) =>
intercept[AssertionError](partTable.listPartitionByNames(names, idents))
}
}
}

0 comments on commit 2c5cc36

Please sign in to comment.