From c3671942abbd5d96d7d2c7496a882be91533838b Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Thu, 23 Nov 2023 20:11:43 +0900 Subject: [PATCH] [SPARK-46064][SQL][SS] Move out EliminateEventTimeWatermark to the analyzer and change to only take effect on resolved child This PR proposes to move out EliminateEventTimeWatermark to the analyzer (one of the analysis rule), and also make a change to eliminate EventTimeWatermark node only when the child of EventTimeWatermark is "resolved". Currently, we apply EliminateEventTimeWatermark immediately when withWatermark is called, which means the rule is applied immediately against the child, regardless whether child is resolved or not. It is not an issue for the usage of DataFrame API initiated by read / readStream, because streaming sources have the flag isStreaming set to true even it is yet resolved. But mix-up of SQL and DataFrame API would expose the issue; we may not know the exact value of isStreaming flag on unresolved node and it is subject to change upon resolution. No. New UTs. No. Closes #43971 from HeartSaVioR/SPARK-46064. Authored-by: Jungtaek Lim Signed-off-by: Jungtaek Lim (cherry picked from commit a703dace0aa400fa24b2bded1500f44ae7ac8db0) Signed-off-by: Jungtaek Lim --- .../sql/catalyst/analysis/Analyzer.scala | 6 +++-- .../sql/catalyst/analysis/AnalysisSuite.scala | 23 +++++++++++++++++++ .../sql/catalyst/analysis/AnalysisTest.scala | 2 ++ .../sql/catalyst/analysis/TestRelations.scala | 14 +++++++++++ .../optimizer/FilterPushdownSuite.scala | 8 +++---- .../scala/org/apache/spark/sql/Dataset.scala | 3 +-- 6 files changed, 48 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 02b9c2445433b..8fe87a05d02d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -348,7 +348,9 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor Batch("Cleanup", fixedPoint, CleanupAliases), Batch("HandleSpecialCommand", Once, - HandleSpecialCommand) + HandleSpecialCommand), + Batch("Remove watermark for batch query", Once, + EliminateEventTimeWatermark) ) /** @@ -3844,7 +3846,7 @@ object CleanupAliases extends Rule[LogicalPlan] with AliasHelper { object EliminateEventTimeWatermark extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning( _.containsPattern(EVENT_TIME_WATERMARK)) { - case EventTimeWatermark(_, _, child) if !child.isStreaming => child + case EventTimeWatermark(_, _, child) if child.resolved && !child.isStreaming => child } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 802b6d471a65c..843d51034aa2b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -1668,4 +1668,27 @@ class AnalysisSuite extends AnalysisTest with Matchers { checkAnalysis(ident2.select($"a"), testRelation.select($"a").analyze) } } + + test("SPARK-46064 Basic functionality of elimination for watermark node in batch query") { + val dfWithEventTimeWatermark = EventTimeWatermark($"ts", + IntervalUtils.fromIntervalString("10 seconds"), batchRelationWithTs) + + val analyzed = getAnalyzer.executeAndCheck(dfWithEventTimeWatermark, new QueryPlanningTracker) + + // EventTimeWatermark node is eliminated via EliminateEventTimeWatermark. + assert(!analyzed.exists(_.isInstanceOf[EventTimeWatermark])) + } + + test("SPARK-46064 EliminateEventTimeWatermark properly handles the case where the child of " + + "EventTimeWatermark changes the isStreaming flag during resolution") { + // UnresolvedRelation which is batch initially and will be resolved as streaming + val dfWithTempView = UnresolvedRelation(TableIdentifier("streamingTable")) + val dfWithEventTimeWatermark = EventTimeWatermark($"ts", + IntervalUtils.fromIntervalString("10 seconds"), dfWithTempView) + + val analyzed = getAnalyzer.executeAndCheck(dfWithEventTimeWatermark, new QueryPlanningTracker) + + // EventTimeWatermark node is NOT eliminated. + assert(analyzed.exists(_.isInstanceOf[EventTimeWatermark])) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala index 997308c6ef44f..5152666473286 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala @@ -84,6 +84,8 @@ trait AnalysisTest extends PlanTest { createTempView(catalog, "TaBlE3", TestRelations.testRelation3, overrideIfExists = true) createGlobalTempView(catalog, "TaBlE4", TestRelations.testRelation4, overrideIfExists = true) createGlobalTempView(catalog, "TaBlE5", TestRelations.testRelation5, overrideIfExists = true) + createTempView(catalog, "streamingTable", TestRelations.streamingRelation, + overrideIfExists = true) new Analyzer(catalog) { override val extendedResolutionRules = extendedAnalysisRules } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TestRelations.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TestRelations.scala index d54237fcc1407..01b1a627e2871 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TestRelations.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TestRelations.scala @@ -68,4 +68,18 @@ object TestRelations { val mapRelation = LocalRelation( AttributeReference("map", MapType(IntegerType, IntegerType))()) + + val streamingRelation = LocalRelation( + Seq( + AttributeReference("a", IntegerType)(), + AttributeReference("ts", TimestampType)() + ), + isStreaming = true) + + val batchRelationWithTs = LocalRelation( + Seq( + AttributeReference("a", IntegerType)(), + AttributeReference("ts", TimestampType)() + ), + isStreaming = false) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index ee56d1fa9acd3..2ebb43d4fba3e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -1190,7 +1190,7 @@ class FilterPushdownSuite extends PlanTest { test("watermark pushdown: no pushdown on watermark attribute #1") { val interval = new CalendarInterval(2, 2, 2000L) - val relation = LocalRelation(attrA, $"b".timestamp, attrC) + val relation = LocalRelation(Seq(attrA, $"b".timestamp, attrC), Nil, isStreaming = true) // Verify that all conditions except the watermark touching condition are pushed down // by the optimizer and others are not. @@ -1205,7 +1205,7 @@ class FilterPushdownSuite extends PlanTest { test("watermark pushdown: no pushdown for nondeterministic filter") { val interval = new CalendarInterval(2, 2, 2000L) - val relation = LocalRelation(attrA, attrB, $"c".timestamp) + val relation = LocalRelation(Seq(attrA, attrB, $"c".timestamp), Nil, isStreaming = true) // Verify that all conditions except the watermark touching condition are pushed down // by the optimizer and others are not. @@ -1221,7 +1221,7 @@ class FilterPushdownSuite extends PlanTest { test("watermark pushdown: full pushdown") { val interval = new CalendarInterval(2, 2, 2000L) - val relation = LocalRelation(attrA, attrB, $"c".timestamp) + val relation = LocalRelation(Seq(attrA, attrB, $"c".timestamp), Nil, isStreaming = true) // Verify that all conditions except the watermark touching condition are pushed down // by the optimizer and others are not. @@ -1236,7 +1236,7 @@ class FilterPushdownSuite extends PlanTest { test("watermark pushdown: no pushdown on watermark attribute #2") { val interval = new CalendarInterval(2, 2, 2000L) - val relation = LocalRelation($"a".timestamp, attrB, attrC) + val relation = LocalRelation(Seq($"a".timestamp, attrB, attrC), Nil, isStreaming = true) val originalQuery = EventTimeWatermark($"a", interval, relation) .where($"a" === new java.sql.Timestamp(0) && $"b" === 10) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index f53c6ddaa3880..c063af9381ff2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -774,8 +774,7 @@ class Dataset[T] private[sql]( val parsedDelay = IntervalUtils.fromIntervalString(delayThreshold) require(!IntervalUtils.isNegative(parsedDelay), s"delay threshold ($delayThreshold) should not be negative.") - EliminateEventTimeWatermark( - EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan)) + EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan) } /**