From 1d1b76b848ec80da9375c34b141db2290a1f7a95 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Mon, 23 Nov 2020 20:31:35 +0300 Subject: [PATCH 1/8] Add a test --- .../AlterTablePartitionV2SQLSuite.scala | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala index e05c2c09ace2a..51dd50c81e7a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala @@ -185,4 +185,32 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { } } } + + test("ALTER TABLE ADD/DROP PARTITIONS: all types") { + val t = "testpart.ns1.ns2.tbl" + withTable(t) { + sql(s""" + |CREATE TABLE $t ( + | part0 bigint + |) USING foo + |PARTITIONED BY ( + | part0 + |)""".stripMargin) + val partTable = catalog("testpart").asTableCatalog + .loadTable(Identifier.of(Array("ns1", "ns2"), "tbl")) + .asPartitionable + val expectedPartition = InternalRow.fromSeq(Seq(123)) + assert(!partTable.partitionExists(expectedPartition)) + sql(s""" + |ALTER TABLE $t ADD PARTITION ( + | part0 = 123 + |) LOCATION 'loc1'""".stripMargin) + assert(partTable.partitionExists(expectedPartition)) + sql(s""" + |ALTER TABLE $t DROP PARTITION ( + | part0 = 123 + |)""".stripMargin) + assert(!partTable.partitionExists(expectedPartition)) + } + } } From 10f31eef79caad48a582659ecc347c2a64475926 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Mon, 23 Nov 2020 20:47:23 +0300 Subject: [PATCH 2/8] Support existing types in the test --- .../AlterTablePartitionV2SQLSuite.scala | 45 ++++++++++++++----- 1 file changed, 34 insertions(+), 11 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala index 51dd50c81e7a5..fe75b48be454d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala @@ -191,25 +191,48 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { withTable(t) { sql(s""" |CREATE TABLE $t ( - | part0 bigint + | part0 tinyint, + | part1 smallint, + | part2 int, + | part3 bigint, + | part4 float, + | part5 double, + | part6 string |) USING foo |PARTITIONED BY ( - | part0 + | part0, + | part1, + | part2, + | part3, + | part4, + | part5, + | part6 |)""".stripMargin) val partTable = catalog("testpart").asTableCatalog .loadTable(Identifier.of(Array("ns1", "ns2"), "tbl")) .asPartitionable - val expectedPartition = InternalRow.fromSeq(Seq(123)) + val expectedPartition = InternalRow.fromSeq(Seq( + -1, // tinyint + 0, // smallint + 1, // int + 2, // bigint + 3.14F, // float + 3.14D, // double + "abc" // string + )) assert(!partTable.partitionExists(expectedPartition)) - sql(s""" - |ALTER TABLE $t ADD PARTITION ( - | part0 = 123 - |) LOCATION 'loc1'""".stripMargin) + val partSpec = """ + | part0 = -1, + | part1 = 0, + | part2 = 1, + | part3 = 2, + | part4 = 3.14, + | part5 = 3.14, + | part6 = 'abc' + |""".stripMargin + sql(s"ALTER TABLE $t ADD PARTITION ($partSpec) LOCATION 'loc1'") assert(partTable.partitionExists(expectedPartition)) - sql(s""" - |ALTER TABLE $t DROP PARTITION ( - | part0 = 123 - |)""".stripMargin) + sql(s" ALTER TABLE $t DROP PARTITION ($partSpec)") assert(!partTable.partitionExists(expectedPartition)) } } From 1b91bbba63c69490b4b93c728d1fab6c248b5022 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Mon, 23 Nov 2020 22:05:34 +0300 Subject: [PATCH 3/8] Universal type converter --- .../analysis/ResolvePartitionSpec.scala | 29 ++----------------- .../AlterTablePartitionV2SQLSuite.scala | 5 ++-- 2 files changed, 6 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala index 531d40f431dee..01f94ddb9c3d0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala @@ -17,9 +17,9 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} import org.apache.spark.sql.catalyst.plans.logical.{AlterTableAddPartition, AlterTableDropPartition, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement @@ -65,31 +65,8 @@ object ResolvePartitionSpec extends Rule[LogicalPlan] { conf.resolver) val partValues = partSchema.map { part => - val partValue = normalizedSpec.get(part.name).orNull - if (partValue == null) { - null - } else { - // TODO: Support other datatypes, such as DateType - part.dataType match { - case _: ByteType => - partValue.toByte - case _: ShortType => - partValue.toShort - case _: IntegerType => - partValue.toInt - case _: LongType => - partValue.toLong - case _: FloatType => - partValue.toFloat - case _: DoubleType => - partValue.toDouble - case _: StringType => - partValue - case _ => - throw new AnalysisException( - s"Type ${part.dataType.typeName} is not supported for partition.") - } - } + val raw = normalizedSpec.get(part.name).orNull + Cast(Literal(raw), part.dataType, Some(conf.sessionLocalTimeZone)).eval() } InternalRow.fromSeq(partValues) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala index fe75b48be454d..f57301191011b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionsException, Partit import org.apache.spark.sql.connector.catalog.{CatalogV2Implicits, Identifier} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.unsafe.types.UTF8String class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { @@ -211,14 +212,14 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { val partTable = catalog("testpart").asTableCatalog .loadTable(Identifier.of(Array("ns1", "ns2"), "tbl")) .asPartitionable - val expectedPartition = InternalRow.fromSeq(Seq( + val expectedPartition = InternalRow.fromSeq(Seq[Any]( -1, // tinyint 0, // smallint 1, // int 2, // bigint 3.14F, // float 3.14D, // double - "abc" // string + UTF8String.fromString("abc") // string )) assert(!partTable.partitionExists(expectedPartition)) val partSpec = """ From e4bd36467ba5545cab727906483c3f40ebfda79c Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Mon, 23 Nov 2020 22:08:07 +0300 Subject: [PATCH 4/8] Check boolean --- .../AlterTablePartitionV2SQLSuite.scala | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala index f57301191011b..7de4ad1b3d178 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala @@ -198,16 +198,11 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { | part3 bigint, | part4 float, | part5 double, - | part6 string + | part6 string, + | part7 boolean |) USING foo |PARTITIONED BY ( - | part0, - | part1, - | part2, - | part3, - | part4, - | part5, - | part6 + | part0, part1, part2, part3, part4, part5, part6, part7 |)""".stripMargin) val partTable = catalog("testpart").asTableCatalog .loadTable(Identifier.of(Array("ns1", "ns2"), "tbl")) @@ -219,7 +214,8 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { 2, // bigint 3.14F, // float 3.14D, // double - UTF8String.fromString("abc") // string + UTF8String.fromString("abc"), // string + true // boolean )) assert(!partTable.partitionExists(expectedPartition)) val partSpec = """ @@ -229,7 +225,8 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { | part3 = 2, | part4 = 3.14, | part5 = 3.14, - | part6 = 'abc' + | part6 = 'abc', + | part7 = true |""".stripMargin sql(s"ALTER TABLE $t ADD PARTITION ($partSpec) LOCATION 'loc1'") assert(partTable.partitionExists(expectedPartition)) From 8e5880b24528bce21d0ea127df01d1723fc957ec Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Mon, 23 Nov 2020 22:17:51 +0300 Subject: [PATCH 5/8] Check date and timestamps --- .../AlterTablePartitionV2SQLSuite.scala | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala index 7de4ad1b3d178..eacfafb53386c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala @@ -17,9 +17,12 @@ package org.apache.spark.sql.connector +import java.time.{LocalDate, LocalDateTime} + import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionsException, PartitionsAlreadyExistException} +import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils} import org.apache.spark.sql.connector.catalog.{CatalogV2Implicits, Identifier} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits import org.apache.spark.sql.internal.SQLConf @@ -199,10 +202,13 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { | part4 float, | part5 double, | part6 string, - | part7 boolean + | part7 boolean, + | part8 date, + | part9 timestamp |) USING foo |PARTITIONED BY ( - | part0, part1, part2, part3, part4, part5, part6, part7 + | part0, part1, part2, part3, part4, part5, part6, part7, + | part8, part9 |)""".stripMargin) val partTable = catalog("testpart").asTableCatalog .loadTable(Identifier.of(Array("ns1", "ns2"), "tbl")) @@ -215,7 +221,10 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { 3.14F, // float 3.14D, // double UTF8String.fromString("abc"), // string - true // boolean + true, // boolean + LocalDate.parse("2020-11-23").toEpochDay, + DateTimeUtils.instantToMicros( + LocalDateTime.parse("2020-11-23T22:13:10.123456").atZone(DateTimeTestUtils.LA).toInstant) )) assert(!partTable.partitionExists(expectedPartition)) val partSpec = """ @@ -226,7 +235,9 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { | part4 = 3.14, | part5 = 3.14, | part6 = 'abc', - | part7 = true + | part7 = true, + | part8 = '2020-11-23', + | part9 = '2020-11-23T22:13:10.123456' |""".stripMargin sql(s"ALTER TABLE $t ADD PARTITION ($partSpec) LOCATION 'loc1'") assert(partTable.partitionExists(expectedPartition)) From d9901a823db28f476742af937a8fc872a3bda67b Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Mon, 23 Nov 2020 22:25:53 +0300 Subject: [PATCH 6/8] Rename the test --- .../sql/connector/AlterTablePartitionV2SQLSuite.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala index eacfafb53386c..eef07cfa432b6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala @@ -190,7 +190,7 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { } } - test("ALTER TABLE ADD/DROP PARTITIONS: all types") { + test("universal type conversions of partition values") { val t = "testpart.ns1.ns2.tbl" withTable(t) { sql(s""" @@ -206,10 +206,8 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { | part8 date, | part9 timestamp |) USING foo - |PARTITIONED BY ( - | part0, part1, part2, part3, part4, part5, part6, part7, - | part8, part9 - |)""".stripMargin) + |PARTITIONED BY (part0, part1, part2, part3, part4, part5, part6, part7, part8, part9) + |""".stripMargin) val partTable = catalog("testpart").asTableCatalog .loadTable(Identifier.of(Array("ns1", "ns2"), "tbl")) .asPartitionable From 2118d0a51c4fa4b363e5fdab1b9bb5d7411b6b4f Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Mon, 23 Nov 2020 22:45:10 +0300 Subject: [PATCH 7/8] Specify raw type explicitly as StringType --- .../spark/sql/catalyst/analysis/ResolvePartitionSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala index 01f94ddb9c3d0..6d061fce06919 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala @@ -66,7 +66,7 @@ object ResolvePartitionSpec extends Rule[LogicalPlan] { val partValues = partSchema.map { part => val raw = normalizedSpec.get(part.name).orNull - Cast(Literal(raw), part.dataType, Some(conf.sessionLocalTimeZone)).eval() + Cast(Literal.create(raw, StringType), part.dataType, Some(conf.sessionLocalTimeZone)).eval() } InternalRow.fromSeq(partValues) } From d0108e097ccbd0531f5189750c129855c97f7378 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Mon, 23 Nov 2020 22:49:40 +0300 Subject: [PATCH 8/8] Add JIRA to the test title --- .../spark/sql/connector/AlterTablePartitionV2SQLSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala index eef07cfa432b6..4cacd5ec2b49e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala @@ -190,7 +190,7 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { } } - test("universal type conversions of partition values") { + test("SPARK-33521: universal type conversions of partition values") { val t = "testpart.ns1.ns2.tbl" withTable(t) { sql(s"""