-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-33521][SQL] Universal type conversion in resolving V2 partition specs #30474
Conversation
Test build #131572 has started for PR 30474 at commit |
Kubernetes integration test starting |
Kubernetes integration test status success |
} | ||
} | ||
val raw = normalizedSpec.get(part.name).orNull | ||
Cast(Literal.create(raw, StringType), part.dataType, Some(conf.sessionLocalTimeZone)).eval() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this matched to the V1 partitioning type coercion? I remember it has a bit different rules, see PartitioningUtils.inferPartitionColumnValue
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have unified tests for V1 and V2 ALTER TABLE .. ADD/DROP PARTITION
at the moment. I plan to do that soon. As soon as we have such tests we will see the differences and fix them.
For now, I just try to make implementation simpler - cast partition values according to the partition schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think V1 does the same, see
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
Lines 149 to 165 in dfa6fb4
/** | |
* Given the partition schema, returns a row with that schema holding the partition values. | |
*/ | |
def toRow(partitionSchema: StructType, defaultTimeZondId: String): InternalRow = { | |
val caseInsensitiveProperties = CaseInsensitiveMap(storage.properties) | |
val timeZoneId = caseInsensitiveProperties.getOrElse( | |
DateTimeUtils.TIMEZONE_OPTION, defaultTimeZondId) | |
InternalRow.fromSeq(partitionSchema.map { field => | |
val partValue = if (spec(field.name) == ExternalCatalogUtils.DEFAULT_PARTITION_NAME) { | |
null | |
} else { | |
spec(field.name) | |
} | |
Cast(Literal(partValue), field.dataType, Option(timeZoneId)).eval() | |
}) | |
} | |
} |
Let me extract the code to PartitioningUtils
, and re-use it in V2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotya
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reused the code from DSv1 #30482 and fixed an issue. @HyukjinKwon Please, review it.
Test build #131579 has finished for PR 30474 at commit
|
thanks, merging to master! |
What changes were proposed in this pull request?
In the PR, I propose to changes the resolver of partition specs used in V2
ALTER TABLE .. ADD/DROP PARTITION
(at the moment), and re-useCAST
in conversion partition values to desired types according to the partition schema.Why are the changes needed?
Currently, the resolver of V2 partition specs supports just a few types:
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala
Line 72 in 23e9920
Does this PR introduce any user-facing change?
Yes
How was this patch tested?
By running
AlterTablePartitionV2SQLSuite