Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-46064][SQL][SS] Move out EliminateEventTimeWatermark to the analyzer and change to only take effect on resolved child #43971

Closed
wants to merge 2 commits into from

Conversation

HeartSaVioR
Copy link
Contributor

What changes were proposed in this pull request?

This PR proposes to move out EliminateEventTimeWatermark to the analyzer (one of the analysis rule), and also make a change to eliminate EventTimeWatermark node only when the child of EventTimeWatermark is "resolved".

Why are the changes needed?

Currently, we apply EliminateEventTimeWatermark immediately when withWatermark is called, which means the rule is applied immediately against the child, regardless whether child is resolved or not.

It is not an issue for the usage of DataFrame API initiated by read / readStream, because streaming sources have the flag isStreaming set to true even it is yet resolved. But mix-up of SQL and DataFrame API would expose the issue; we may not know the exact value of isStreaming flag on unresolved node and it is subject to change upon resolution.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New UTs.

Was this patch authored or co-authored using generative AI tooling?

No.

…r and change to only take effect on resolved child
@HeartSaVioR HeartSaVioR changed the title [SPARK-46064][SS] Move out EliminateEventTimeWatermark to the analyzer and change to only take effect on resolved child [SPARK-46064][SQL][SS] Move out EliminateEventTimeWatermark to the analyzer and change to only take effect on resolved child Nov 23, 2023
@github-actions github-actions bot added the SQL label Nov 23, 2023
@HeartSaVioR
Copy link
Contributor Author

cc. @cloud-fan PTAL, thanks!

@@ -347,7 +347,9 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
Batch("Cleanup", fixedPoint,
CleanupAliases),
Batch("HandleSpecialCommand", Once,
HandleSpecialCommand)
HandleSpecialCommand),
Batch("Remove watermark for batch query", Once,
Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Nov 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the node is expected to be already resolved from this point, but I'm checking the flag of resolved in EliminateEventTimeWatermark (again), to make the logic be self-describe.

@@ -68,4 +68,18 @@ object TestRelations {

val mapRelation = LocalRelation(
AttributeReference("map", MapType(IntegerType, IntegerType))())

val streamingRelation = LocalRelation(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also added in #43966 - I'll rebase when either one is merged.

@HeartSaVioR
Copy link
Contributor Author

https://github.com/HeartSaVioR/spark/actions/runs/6967344462/job/18959281391

failures do not relate to the change. SQL tests are all passing.

@HeartSaVioR
Copy link
Contributor Author

Thanks, merging to master/3.5/3.4.

HeartSaVioR added a commit that referenced this pull request Nov 23, 2023
…alyzer and change to only take effect on resolved child

This PR proposes to move out EliminateEventTimeWatermark to the analyzer (one of the analysis rule), and also make a change to eliminate EventTimeWatermark node only when the child of EventTimeWatermark is "resolved".

Currently, we apply EliminateEventTimeWatermark immediately when withWatermark is called, which means the rule is applied immediately against the child, regardless whether child is resolved or not.

It is not an issue for the usage of DataFrame API initiated by read / readStream, because streaming sources have the flag isStreaming set to true even it is yet resolved. But mix-up of SQL and DataFrame API would expose the issue; we may not know the exact value of isStreaming flag on unresolved node and it is subject to change upon resolution.

No.

New UTs.

No.

Closes #43971 from HeartSaVioR/SPARK-46064.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit a703dac)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
HeartSaVioR added a commit that referenced this pull request Nov 23, 2023
…alyzer and change to only take effect on resolved child

This PR proposes to move out EliminateEventTimeWatermark to the analyzer (one of the analysis rule), and also make a change to eliminate EventTimeWatermark node only when the child of EventTimeWatermark is "resolved".

Currently, we apply EliminateEventTimeWatermark immediately when withWatermark is called, which means the rule is applied immediately against the child, regardless whether child is resolved or not.

It is not an issue for the usage of DataFrame API initiated by read / readStream, because streaming sources have the flag isStreaming set to true even it is yet resolved. But mix-up of SQL and DataFrame API would expose the issue; we may not know the exact value of isStreaming flag on unresolved node and it is subject to change upon resolution.

No.

New UTs.

No.

Closes #43971 from HeartSaVioR/SPARK-46064.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit a703dac)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
szehon-ho pushed a commit to szehon-ho/spark that referenced this pull request Feb 7, 2024
…alyzer and change to only take effect on resolved child

This PR proposes to move out EliminateEventTimeWatermark to the analyzer (one of the analysis rule), and also make a change to eliminate EventTimeWatermark node only when the child of EventTimeWatermark is "resolved".

Currently, we apply EliminateEventTimeWatermark immediately when withWatermark is called, which means the rule is applied immediately against the child, regardless whether child is resolved or not.

It is not an issue for the usage of DataFrame API initiated by read / readStream, because streaming sources have the flag isStreaming set to true even it is yet resolved. But mix-up of SQL and DataFrame API would expose the issue; we may not know the exact value of isStreaming flag on unresolved node and it is subject to change upon resolution.

No.

New UTs.

No.

Closes apache#43971 from HeartSaVioR/SPARK-46064.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit a703dac)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants