-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22187][SS] Update unsaferow format for saved state in flatMapGroupsWithState to allow timeouts with deleted state #21739
Conversation
@@ -43,7 +43,7 @@ case class ObjectType(cls: Class[_]) extends DataType { | |||
|
|||
def asNullable: DataType = this | |||
|
|||
override def simpleString: String = cls.getName | |||
override def simpleString: String = s"Object[${cls.getName}]" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is to make it differentiate between IntegerType and ObjectType with int
in it. Both show up in the simpleStreaing as int
.
Test build #92766 has finished for PR 21739 at commit
|
Test build #92790 has finished for PR 21739 at commit
|
jenkins retest this please |
Test build #92836 has finished for PR 21739 at commit
|
} | ||
|
||
|
||
private class StateManagerImplV2( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add docs explaining the state format
Test build #92853 has finished for PR 21739 at commit
|
Test build #92845 has finished for PR 21739 at commit
|
Test build #92846 has finished for PR 21739 at commit
|
LGTM |
Test build #93258 has finished for PR 21739 at commit
|
LGTM again |
What changes were proposed in this pull request?
Currently, the group state of user-defined-type is encoded as top-level columns in the UnsafeRows stores in the state store. The timeout timestamp is also saved as (when needed) as the last top-level column. Since the group state is serialized to top-level columns, you cannot save "null" as a value of state (setting null in all the top-level columns is not equivalent). So we don't let the user set the timeout without initializing the state for a key. Based on user experience, this leads to confusion.
This PR is to change the row format such that the state is saved as nested columns. This would allow the state to be set to null, and avoid these confusing corner cases. However, queries recovering from existing checkpoint will use the previous format to maintain compatibility with existing production queries.
How was this patch tested?
Refactored existing end-to-end tests and added new tests for explicitly testing obj-to-row conversion for both state formats.