Skip to content

Commit

Permalink
[SPARK-33505][SQL][TESTS] Fix adding new partitions by INSERT INTO `I…
Browse files Browse the repository at this point in the history
…nMemoryPartitionTable`

### What changes were proposed in this pull request?
1. Add a hook method to `addPartitionKey()` of `InMemoryTable` which is called per every row.
2. Override `addPartitionKey()` in `InMemoryPartitionTable`, and add partition key every time when new row is inserted to the table.

### Why are the changes needed?
To be able to write unified tests for datasources V1 and V2. Currently, INSERT INTO a V1 table creates partitions but the same doesn't work for the custom catalog `InMemoryPartitionTableCatalog` used in DSv2 tests.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running the affected test suite `DataSourceV2SQLSuite`.

Closes #30449 from MaxGekk/insert-into-InMemoryPartitionTable.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
MaxGekk authored and dongjoon-hyun committed Nov 21, 2020
1 parent 67c6ed9 commit 530c0a8
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,8 @@ class InMemoryPartitionTable(

override def partitionExists(ident: InternalRow): Boolean =
memoryTablePartitions.containsKey(ident)

override protected def addPartitionKey(key: Seq[Any]): Unit = {
memoryTablePartitions.put(InternalRow.fromSeq(key), Map.empty[String, String].asJava)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,15 @@ class InMemoryTable(
}
}

protected def addPartitionKey(key: Seq[Any]): Unit = {}

def withData(data: Array[BufferedRows]): InMemoryTable = dataMap.synchronized {
data.foreach(_.rows.foreach { row =>
val key = getKey(row)
dataMap += dataMap.get(key)
.map(key -> _.withRow(row))
.getOrElse(key -> new BufferedRows(key.toArray.mkString("/")).withRow(row))
addPartitionKey(key)
})
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._

import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchNamespaceException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.connector.catalog._
Expand All @@ -35,6 +36,7 @@ import org.apache.spark.sql.internal.connector.SimpleTableProvider
import org.apache.spark.sql.sources.SimpleScanSource
import org.apache.spark.sql.types.{BooleanType, LongType, StringType, StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils

class DataSourceV2SQLSuite
Expand Down Expand Up @@ -2538,6 +2540,25 @@ class DataSourceV2SQLSuite
}
}

test("SPARK-33505: insert into partitioned table") {
val t = "testpart.ns1.ns2.tbl"
withTable(t) {
sql(s"""
|CREATE TABLE $t (id bigint, city string, data string)
|USING foo
|PARTITIONED BY (id, city)""".stripMargin)
val partTable = catalog("testpart").asTableCatalog
.loadTable(Identifier.of(Array("ns1", "ns2"), "tbl")).asInstanceOf[InMemoryPartitionTable]
val expectedPartitionIdent = InternalRow.fromSeq(Seq(1, UTF8String.fromString("NY")))
assert(!partTable.partitionExists(expectedPartitionIdent))
sql(s"INSERT INTO $t PARTITION(id = 1, city = 'NY') SELECT 'abc'")
assert(partTable.partitionExists(expectedPartitionIdent))
// Insert into the existing partition must not fail
sql(s"INSERT INTO $t PARTITION(id = 1, city = 'NY') SELECT 'def'")
assert(partTable.partitionExists(expectedPartitionIdent))
}
}

private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = {
val e = intercept[AnalysisException] {
sql(s"$sqlCommand $sqlParams")
Expand Down

0 comments on commit 530c0a8

Please sign in to comment.