diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 6f201ba3a842f..5fef7900d7956 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -347,7 +347,9 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor Batch("Cleanup", fixedPoint, CleanupAliases), Batch("HandleSpecialCommand", Once, - HandleSpecialCommand) + HandleSpecialCommand), + Batch("Remove watermark for batch query", Once, + EliminateEventTimeWatermark) ) /** @@ -3938,7 +3940,7 @@ object CleanupAliases extends Rule[LogicalPlan] with AliasHelper { object EliminateEventTimeWatermark extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning( _.containsPattern(EVENT_TIME_WATERMARK)) { - case EventTimeWatermark(_, _, child) if !child.isStreaming => child + case EventTimeWatermark(_, _, child) if child.resolved && !child.isStreaming => child } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 441b5fb6ca6fb..d035dcad81e78 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -1724,4 +1724,27 @@ class AnalysisSuite extends AnalysisTest with Matchers { checkAnalysis(ident2.select($"a"), testRelation.select($"a").analyze) } } + + test("SPARK-46064 Basic functionality of elimination for watermark node in batch query") { + val dfWithEventTimeWatermark = EventTimeWatermark($"ts", + IntervalUtils.fromIntervalString("10 seconds"), batchRelationWithTs) + + val analyzed = getAnalyzer.executeAndCheck(dfWithEventTimeWatermark, new QueryPlanningTracker) + + // EventTimeWatermark node is eliminated via EliminateEventTimeWatermark. + assert(!analyzed.exists(_.isInstanceOf[EventTimeWatermark])) + } + + test("SPARK-46064 EliminateEventTimeWatermark properly handles the case where the child of " + + "EventTimeWatermark changes the isStreaming flag during resolution") { + // UnresolvedRelation which is batch initially and will be resolved as streaming + val dfWithTempView = UnresolvedRelation(TableIdentifier("streamingTable")) + val dfWithEventTimeWatermark = EventTimeWatermark($"ts", + IntervalUtils.fromIntervalString("10 seconds"), dfWithTempView) + + val analyzed = getAnalyzer.executeAndCheck(dfWithEventTimeWatermark, new QueryPlanningTracker) + + // EventTimeWatermark node is NOT eliminated. + assert(analyzed.exists(_.isInstanceOf[EventTimeWatermark])) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala index ba4e7b279f512..2ee434704d6c7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala @@ -82,6 +82,8 @@ trait AnalysisTest extends PlanTest { createTempView(catalog, "TaBlE3", TestRelations.testRelation3, overrideIfExists = true) createGlobalTempView(catalog, "TaBlE4", TestRelations.testRelation4, overrideIfExists = true) createGlobalTempView(catalog, "TaBlE5", TestRelations.testRelation5, overrideIfExists = true) + createTempView(catalog, "streamingTable", TestRelations.streamingRelation, + overrideIfExists = true) new Analyzer(catalog) { override val extendedResolutionRules = extendedAnalysisRules } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TestRelations.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TestRelations.scala index d54237fcc1407..01b1a627e2871 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TestRelations.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TestRelations.scala @@ -68,4 +68,18 @@ object TestRelations { val mapRelation = LocalRelation( AttributeReference("map", MapType(IntegerType, IntegerType))()) + + val streamingRelation = LocalRelation( + Seq( + AttributeReference("a", IntegerType)(), + AttributeReference("ts", TimestampType)() + ), + isStreaming = true) + + val batchRelationWithTs = LocalRelation( + Seq( + AttributeReference("a", IntegerType)(), + AttributeReference("ts", TimestampType)() + ), + isStreaming = false) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index ee56d1fa9acd3..2ebb43d4fba3e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -1190,7 +1190,7 @@ class FilterPushdownSuite extends PlanTest { test("watermark pushdown: no pushdown on watermark attribute #1") { val interval = new CalendarInterval(2, 2, 2000L) - val relation = LocalRelation(attrA, $"b".timestamp, attrC) + val relation = LocalRelation(Seq(attrA, $"b".timestamp, attrC), Nil, isStreaming = true) // Verify that all conditions except the watermark touching condition are pushed down // by the optimizer and others are not. @@ -1205,7 +1205,7 @@ class FilterPushdownSuite extends PlanTest { test("watermark pushdown: no pushdown for nondeterministic filter") { val interval = new CalendarInterval(2, 2, 2000L) - val relation = LocalRelation(attrA, attrB, $"c".timestamp) + val relation = LocalRelation(Seq(attrA, attrB, $"c".timestamp), Nil, isStreaming = true) // Verify that all conditions except the watermark touching condition are pushed down // by the optimizer and others are not. @@ -1221,7 +1221,7 @@ class FilterPushdownSuite extends PlanTest { test("watermark pushdown: full pushdown") { val interval = new CalendarInterval(2, 2, 2000L) - val relation = LocalRelation(attrA, attrB, $"c".timestamp) + val relation = LocalRelation(Seq(attrA, attrB, $"c".timestamp), Nil, isStreaming = true) // Verify that all conditions except the watermark touching condition are pushed down // by the optimizer and others are not. @@ -1236,7 +1236,7 @@ class FilterPushdownSuite extends PlanTest { test("watermark pushdown: no pushdown on watermark attribute #2") { val interval = new CalendarInterval(2, 2, 2000L) - val relation = LocalRelation($"a".timestamp, attrB, attrC) + val relation = LocalRelation(Seq($"a".timestamp, attrB, attrC), Nil, isStreaming = true) val originalQuery = EventTimeWatermark($"a", interval, relation) .where($"a" === new java.sql.Timestamp(0) && $"b" === 10) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 04a4563396053..d36aaef558663 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -778,8 +778,7 @@ class Dataset[T] private[sql]( val parsedDelay = IntervalUtils.fromIntervalString(delayThreshold) require(!IntervalUtils.isNegative(parsedDelay), s"delay threshold ($delayThreshold) should not be negative.") - EliminateEventTimeWatermark( - EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan)) + EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan) } }