Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49467][SS] Add support for state data source reader and list state #47978

Closed
wants to merge 3 commits into from

Conversation

anishshri-db
Copy link
Contributor

@anishshri-db anishshri-db commented Sep 3, 2024

What changes were proposed in this pull request?

Add support for state data source reader and list state

Why are the changes needed?

This change adds support for reading state written using list state used primarily within the stateful processor used with the transformWithState operator

Does this PR introduce any user-facing change?

Yes

Users can read state and explode entries using the following query:

        val stateReaderDf = spark.read
          .format("statestore")
          .option(StateSourceOptions.PATH, <checkpoint_location>)
          .option(StateSourceOptions.STATE_VAR_NAME, <state_var_name>)
          .load()

        val listStateDf = stateReaderDf
          .selectExpr(
            "key.value AS groupingKey",
            "list_value AS valueList",
            "partition_id")
          .select($"groupingKey",
            explode($"valueList").as("valueList"))

How was this patch tested?

Added unit tests

[info] Run completed in 1 minute, 3 seconds.
[info] Total number of tests run: 8
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.

Was this patch authored or co-authored using generative AI tooling?

No

@anishshri-db anishshri-db changed the title [SPARK-49467] Add support for state data source reader and list state [SPARK-49467][SS] Add support for state data source reader and list state Sep 3, 2024
@github-actions github-actions bot added the SQL label Sep 3, 2024
@anishshri-db
Copy link
Contributor Author

cc - @HeartSaVioR @jingz-db - could you PTAL ? Thx

Copy link
Contributor

@jingz-db jingz-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Thanks for making the change. Left some small nits.

@anishshri-db
Copy link
Contributor Author

@HeartSaVioR - could you PTAL ? thx

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass, didn't go through test as we'd probably need to reconsider the UX and it's better to review the test later.

@@ -166,16 +184,22 @@ class StatePartitionReader(
stateVariableInfoOpt match {
case Some(stateVarInfo) =>
val stateVarType = stateVarInfo.stateVariableType
val hasTTLEnabled = stateVarInfo.ttlEnabled
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intentional to remove out the readability of TTL functionality for value state? Just wanted to know whether we give up functionality in certain reason, or you want to defer this till we have every types in support range and think about UX which works for every types.

Copy link
Contributor Author

@anishshri-db anishshri-db Sep 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea - for list state, we can't have a separate column for TTL. So in that case, we have to embed it as part of the value itself. So trying to make it uniform for value state as well

Copy link
Contributor

@HeartSaVioR HeartSaVioR Sep 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But say, if we explode elements in list state (and map state), we will be able to add TTL column for them and the schema for the value would be very similar (the only difference is an user key in map state). Do I understand correctly?

Copy link
Contributor

@HeartSaVioR HeartSaVioR Sep 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As commented in other comment thread, I'm OK with this change, but let's see how we could deal with TTL in general. We can defer the decision till we are seeing a full picture of this (among state types).

Could you please file a JIRA ticket to address TTL for all state types? Thanks!

Copy link
Contributor

@HeartSaVioR HeartSaVioR Sep 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I see what you are doing now. It's just showing the value schema as passthrough, so TTL will be just a part of value if it's described. We do this in general (other built-in operators), so OK.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct yea


val expectedFieldNames = if (sourceOptions.readChangeFeed) {
Seq("batch_id", "change_type", "key", "value", "partition_id")
} else if (transformWithStateVariableInfoOpt.isDefined) {
val stateVarInfo = transformWithStateVariableInfoOpt.get
val hasTTLEnabled = stateVarInfo.ttlEnabled
val stateVarType = stateVarInfo.stateVariableType

stateVarType match {
case StateVariableType.ValueState =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now the schema is quite dynamic - have we run explain to check the schema is captured in dry-run as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean. The only difference is the additional sub-column if TTL is enabled right ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, what I meant is whether inferSchema could capture all the details. I assume so but wanted to double confirm with the result of running "explain" for value state vs list state.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please consider this as post-review comment. Not a blocker for merging this PR.

case StateVariableType.ListState =>
val key = pair.key
val result = store.valuesIterator(key, stateVarName)
var unsafeRowArr: Seq[UnsafeRow] = Seq.empty
Copy link
Contributor

@HeartSaVioR HeartSaVioR Sep 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth noting that we are materializing every element in the list, so despite we avoid memory issue in ListState, the problem will pop up when reading ListState via state data source reader.

I guess it's probably worth reconsidering the schema and UX. e.g. having index and value as columns for the value of list state and allow multiple rows for the same state value. (You've used explode in the example but we could just provide that result directly.) You'll also have the same issue with MapType as well, so worth considering. (Say, index here as the same with userKey in MapType)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this, I was thinking of adding like a maxEntriesPerRow type setting (sort of like a pagination thing) which would bound the list/map entries for the same grouping key, but the schema would remain ArrayType/MapType per row. Was thinking of adding this as a separate change though after the base change was merged

Copy link
Contributor

@HeartSaVioR HeartSaVioR Sep 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's easier than just exploding. Let's say, users have to paginate and there are three rows for the same state value having 100 elements in the array. This is nowhere between "everything is in a single array and they can deal with SQL array functions - though not very powerful" and "it's flattened and they can do whatever operation per element, and they can even aggregate back".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's even OK to not have index and just have multiple rows for the same state key. (It's same as explode) Having index would help us to look into detail of how array elements are stored though.

But I don't feel like having 1000s of elements (even not primitive) in ArrayType is great in overall.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc. @cloud-fan I'd like to get some advice from Spark SQL expert. Could you please take a quick look into comments and provide some input? Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed with @cloud-fan . We came up with conclusion that there is no good solution.

I'm OK with this - we have more time to revisit this (addition of TTL, etc.) and change our mind later if we think there is a better way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea sounds good - we'll discuss offline and update the UX as needed

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants