Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33521][SQL] Universal type conversion in resolving V2 partition specs #30474

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{AlterTableAddPartition, AlterTableDropPartition, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement
Expand Down Expand Up @@ -65,31 +65,8 @@ object ResolvePartitionSpec extends Rule[LogicalPlan] {
conf.resolver)

val partValues = partSchema.map { part =>
val partValue = normalizedSpec.get(part.name).orNull
if (partValue == null) {
null
} else {
// TODO: Support other datatypes, such as DateType
part.dataType match {
case _: ByteType =>
partValue.toByte
case _: ShortType =>
partValue.toShort
case _: IntegerType =>
partValue.toInt
case _: LongType =>
partValue.toLong
case _: FloatType =>
partValue.toFloat
case _: DoubleType =>
partValue.toDouble
case _: StringType =>
partValue
case _ =>
throw new AnalysisException(
s"Type ${part.dataType.typeName} is not supported for partition.")
}
}
val raw = normalizedSpec.get(part.name).orNull
Cast(Literal.create(raw, StringType), part.dataType, Some(conf.sessionLocalTimeZone)).eval()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this matched to the V1 partitioning type coercion? I remember it has a bit different rules, see PartitioningUtils.inferPartitionColumnValue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have unified tests for V1 and V2 ALTER TABLE .. ADD/DROP PARTITION at the moment. I plan to do that soon. As soon as we have such tests we will see the differences and fix them.

For now, I just try to make implementation simpler - cast partition values according to the partition schema.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think V1 does the same, see

/**
* Given the partition schema, returns a row with that schema holding the partition values.
*/
def toRow(partitionSchema: StructType, defaultTimeZondId: String): InternalRow = {
val caseInsensitiveProperties = CaseInsensitiveMap(storage.properties)
val timeZoneId = caseInsensitiveProperties.getOrElse(
DateTimeUtils.TIMEZONE_OPTION, defaultTimeZondId)
InternalRow.fromSeq(partitionSchema.map { field =>
val partValue = if (spec(field.name) == ExternalCatalogUtils.DEFAULT_PARTITION_NAME) {
null
} else {
spec(field.name)
}
Cast(Literal(partValue), field.dataType, Option(timeZoneId)).eval()
})
}
}

Let me extract the code to PartitioningUtils, and re-use it in V2.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotya

Copy link
Member Author

@MaxGekk MaxGekk Nov 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reused the code from DSv1 #30482 and fixed an issue. @HyukjinKwon Please, review it.

}
InternalRow.fromSeq(partValues)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@

package org.apache.spark.sql.connector

import java.time.{LocalDate, LocalDateTime}

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionsException, PartitionsAlreadyExistException}
import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
import org.apache.spark.sql.connector.catalog.{CatalogV2Implicits, Identifier}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.unsafe.types.UTF8String

class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {

Expand Down Expand Up @@ -185,4 +189,58 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {
}
}
}

test("SPARK-33521: universal type conversions of partition values") {
val t = "testpart.ns1.ns2.tbl"
withTable(t) {
sql(s"""
|CREATE TABLE $t (
| part0 tinyint,
| part1 smallint,
| part2 int,
| part3 bigint,
| part4 float,
| part5 double,
| part6 string,
| part7 boolean,
| part8 date,
| part9 timestamp
|) USING foo
|PARTITIONED BY (part0, part1, part2, part3, part4, part5, part6, part7, part8, part9)
|""".stripMargin)
val partTable = catalog("testpart").asTableCatalog
.loadTable(Identifier.of(Array("ns1", "ns2"), "tbl"))
.asPartitionable
val expectedPartition = InternalRow.fromSeq(Seq[Any](
-1, // tinyint
0, // smallint
1, // int
2, // bigint
3.14F, // float
3.14D, // double
UTF8String.fromString("abc"), // string
true, // boolean
LocalDate.parse("2020-11-23").toEpochDay,
DateTimeUtils.instantToMicros(
LocalDateTime.parse("2020-11-23T22:13:10.123456").atZone(DateTimeTestUtils.LA).toInstant)
))
assert(!partTable.partitionExists(expectedPartition))
val partSpec = """
| part0 = -1,
| part1 = 0,
| part2 = 1,
| part3 = 2,
| part4 = 3.14,
| part5 = 3.14,
| part6 = 'abc',
| part7 = true,
| part8 = '2020-11-23',
| part9 = '2020-11-23T22:13:10.123456'
|""".stripMargin
sql(s"ALTER TABLE $t ADD PARTITION ($partSpec) LOCATION 'loc1'")
assert(partTable.partitionExists(expectedPartition))
sql(s" ALTER TABLE $t DROP PARTITION ($partSpec)")
assert(!partTable.partitionExists(expectedPartition))
}
}
}