-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-49467][SS] Add support for state data source reader and list state #47978
Conversation
cc - @HeartSaVioR @jingz-db - could you PTAL ? Thx |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Thanks for making the change. Left some small nits.
...rc/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
Outdated
Show resolved
Hide resolved
@HeartSaVioR - could you PTAL ? thx |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First pass, didn't go through test as we'd probably need to reconsider the UX and it's better to review the test later.
@@ -166,16 +184,22 @@ class StatePartitionReader( | |||
stateVariableInfoOpt match { | |||
case Some(stateVarInfo) => | |||
val stateVarType = stateVarInfo.stateVariableType | |||
val hasTTLEnabled = stateVarInfo.ttlEnabled |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it intentional to remove out the readability of TTL functionality for value state? Just wanted to know whether we give up functionality in certain reason, or you want to defer this till we have every types in support range and think about UX which works for every types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea - for list state, we can't have a separate column for TTL. So in that case, we have to embed it as part of the value itself. So trying to make it uniform for value state as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But say, if we explode elements in list state (and map state), we will be able to add TTL column for them and the schema for the value would be very similar (the only difference is an user key in map state). Do I understand correctly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As commented in other comment thread, I'm OK with this change, but let's see how we could deal with TTL in general. We can defer the decision till we are seeing a full picture of this (among state types).
Could you please file a JIRA ticket to address TTL for all state types? Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I see what you are doing now. It's just showing the value schema as passthrough, so TTL will be just a part of value if it's described. We do this in general (other built-in operators), so OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct yea
|
||
val expectedFieldNames = if (sourceOptions.readChangeFeed) { | ||
Seq("batch_id", "change_type", "key", "value", "partition_id") | ||
} else if (transformWithStateVariableInfoOpt.isDefined) { | ||
val stateVarInfo = transformWithStateVariableInfoOpt.get | ||
val hasTTLEnabled = stateVarInfo.ttlEnabled | ||
val stateVarType = stateVarInfo.stateVariableType | ||
|
||
stateVarType match { | ||
case StateVariableType.ValueState => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now the schema is quite dynamic - have we run explain
to check the schema is captured in dry-run as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what you mean. The only difference is the additional sub-column if TTL is enabled right ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, what I meant is whether inferSchema could capture all the details. I assume so but wanted to double confirm with the result of running "explain" for value state vs list state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please consider this as post-review comment. Not a blocker for merging this PR.
case StateVariableType.ListState => | ||
val key = pair.key | ||
val result = store.valuesIterator(key, stateVarName) | ||
var unsafeRowArr: Seq[UnsafeRow] = Seq.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth noting that we are materializing every element in the list, so despite we avoid memory issue in ListState, the problem will pop up when reading ListState via state data source reader.
I guess it's probably worth reconsidering the schema and UX. e.g. having index
and value
as columns for the value of list state and allow multiple rows for the same state value. (You've used explode in the example but we could just provide that result directly.) You'll also have the same issue with MapType as well, so worth considering. (Say, index
here as the same with userKey
in MapType)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this, I was thinking of adding like a maxEntriesPerRow
type setting (sort of like a pagination thing) which would bound the list/map entries for the same grouping key, but the schema would remain ArrayType/MapType
per row. Was thinking of adding this as a separate change though after the base change was merged
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that's easier than just exploding. Let's say, users have to paginate and there are three rows for the same state value having 100 elements in the array. This is nowhere between "everything is in a single array and they can deal with SQL array functions - though not very powerful" and "it's flattened and they can do whatever operation per element, and they can even aggregate back".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's even OK to not have index and just have multiple rows for the same state key. (It's same as explode) Having index would help us to look into detail of how array elements are stored though.
But I don't feel like having 1000s of elements (even not primitive) in ArrayType is great in overall.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc. @cloud-fan I'd like to get some advice from Spark SQL expert. Could you please take a quick look into comments and provide some input? Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed with @cloud-fan . We came up with conclusion that there is no good solution.
I'm OK with this - we have more time to revisit this (addition of TTL, etc.) and change our mind later if we think there is a better way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea sounds good - we'll discuss offline and update the UX as needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Thanks! Merging to master. |
What changes were proposed in this pull request?
Add support for state data source reader and list state
Why are the changes needed?
This change adds support for reading state written using list state used primarily within the stateful processor used with the
transformWithState
operatorDoes this PR introduce any user-facing change?
Yes
Users can read state and
explode
entries using the following query:How was this patch tested?
Added unit tests
Was this patch authored or co-authored using generative AI tooling?
No