Skip to content

Commit

Permalink
[SPARK-46064][SQL][SS] Move out EliminateEventTimeWatermark to the an…
Browse files Browse the repository at this point in the history
…alyzer and change to only take effect on resolved child

This PR proposes to move out EliminateEventTimeWatermark to the analyzer (one of the analysis rule), and also make a change to eliminate EventTimeWatermark node only when the child of EventTimeWatermark is "resolved".

Currently, we apply EliminateEventTimeWatermark immediately when withWatermark is called, which means the rule is applied immediately against the child, regardless whether child is resolved or not.

It is not an issue for the usage of DataFrame API initiated by read / readStream, because streaming sources have the flag isStreaming set to true even it is yet resolved. But mix-up of SQL and DataFrame API would expose the issue; we may not know the exact value of isStreaming flag on unresolved node and it is subject to change upon resolution.

No.

New UTs.

No.

Closes #43971 from HeartSaVioR/SPARK-46064.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit a703dac)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
  • Loading branch information
HeartSaVioR committed Nov 23, 2023
1 parent 0fd1501 commit cfe072a
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,9 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
Batch("Cleanup", fixedPoint,
CleanupAliases),
Batch("HandleSpecialCommand", Once,
HandleSpecialCommand)
HandleSpecialCommand),
Batch("Remove watermark for batch query", Once,
EliminateEventTimeWatermark)
)

/**
Expand Down Expand Up @@ -3905,7 +3907,7 @@ object CleanupAliases extends Rule[LogicalPlan] with AliasHelper {
object EliminateEventTimeWatermark extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
_.containsPattern(EVENT_TIME_WATERMARK)) {
case EventTimeWatermark(_, _, child) if !child.isStreaming => child
case EventTimeWatermark(_, _, child) if child.resolved && !child.isStreaming => child
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1443,4 +1443,27 @@ class AnalysisSuite extends AnalysisTest with Matchers {
val rel = LocalRelation(attr)
checkAnalysis(rel.select($"a"), rel.select(attr.markAsAllowAnyAccess()))
}

test("SPARK-46064 Basic functionality of elimination for watermark node in batch query") {
val dfWithEventTimeWatermark = EventTimeWatermark($"ts",
IntervalUtils.fromIntervalString("10 seconds"), batchRelationWithTs)

val analyzed = getAnalyzer.executeAndCheck(dfWithEventTimeWatermark, new QueryPlanningTracker)

// EventTimeWatermark node is eliminated via EliminateEventTimeWatermark.
assert(!analyzed.exists(_.isInstanceOf[EventTimeWatermark]))
}

test("SPARK-46064 EliminateEventTimeWatermark properly handles the case where the child of " +
"EventTimeWatermark changes the isStreaming flag during resolution") {
// UnresolvedRelation which is batch initially and will be resolved as streaming
val dfWithTempView = UnresolvedRelation(TableIdentifier("streamingTable"))
val dfWithEventTimeWatermark = EventTimeWatermark($"ts",
IntervalUtils.fromIntervalString("10 seconds"), dfWithTempView)

val analyzed = getAnalyzer.executeAndCheck(dfWithEventTimeWatermark, new QueryPlanningTracker)

// EventTimeWatermark node is NOT eliminated.
assert(analyzed.exists(_.isInstanceOf[EventTimeWatermark]))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ trait AnalysisTest extends PlanTest {
createTempView(catalog, "TaBlE3", TestRelations.testRelation3, overrideIfExists = true)
createGlobalTempView(catalog, "TaBlE4", TestRelations.testRelation4, overrideIfExists = true)
createGlobalTempView(catalog, "TaBlE5", TestRelations.testRelation5, overrideIfExists = true)
createTempView(catalog, "streamingTable", TestRelations.streamingRelation,
overrideIfExists = true)
new Analyzer(catalog) {
override val extendedResolutionRules = extendedAnalysisRules
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,18 @@ object TestRelations {

val mapRelation = LocalRelation(
AttributeReference("map", MapType(IntegerType, IntegerType))())

val streamingRelation = LocalRelation(
Seq(
AttributeReference("a", IntegerType)(),
AttributeReference("ts", TimestampType)()
),
isStreaming = true)

val batchRelationWithTs = LocalRelation(
Seq(
AttributeReference("a", IntegerType)(),
AttributeReference("ts", TimestampType)()
),
isStreaming = false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,7 @@ class FilterPushdownSuite extends PlanTest {

test("watermark pushdown: no pushdown on watermark attribute #1") {
val interval = new CalendarInterval(2, 2, 2000L)
val relation = LocalRelation(attrA, $"b".timestamp, attrC)
val relation = LocalRelation(Seq(attrA, $"b".timestamp, attrC), Nil, isStreaming = true)

// Verify that all conditions except the watermark touching condition are pushed down
// by the optimizer and others are not.
Expand All @@ -1205,7 +1205,7 @@ class FilterPushdownSuite extends PlanTest {

test("watermark pushdown: no pushdown for nondeterministic filter") {
val interval = new CalendarInterval(2, 2, 2000L)
val relation = LocalRelation(attrA, attrB, $"c".timestamp)
val relation = LocalRelation(Seq(attrA, attrB, $"c".timestamp), Nil, isStreaming = true)

// Verify that all conditions except the watermark touching condition are pushed down
// by the optimizer and others are not.
Expand All @@ -1221,7 +1221,7 @@ class FilterPushdownSuite extends PlanTest {

test("watermark pushdown: full pushdown") {
val interval = new CalendarInterval(2, 2, 2000L)
val relation = LocalRelation(attrA, attrB, $"c".timestamp)
val relation = LocalRelation(Seq(attrA, attrB, $"c".timestamp), Nil, isStreaming = true)

// Verify that all conditions except the watermark touching condition are pushed down
// by the optimizer and others are not.
Expand All @@ -1236,7 +1236,7 @@ class FilterPushdownSuite extends PlanTest {

test("watermark pushdown: no pushdown on watermark attribute #2") {
val interval = new CalendarInterval(2, 2, 2000L)
val relation = LocalRelation($"a".timestamp, attrB, attrC)
val relation = LocalRelation(Seq($"a".timestamp, attrB, attrC), Nil, isStreaming = true)

val originalQuery = EventTimeWatermark($"a", interval, relation)
.where($"a" === new java.sql.Timestamp(0) && $"b" === 10)
Expand Down
3 changes: 1 addition & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -743,8 +743,7 @@ class Dataset[T] private[sql](
val parsedDelay = IntervalUtils.fromIntervalString(delayThreshold)
require(!IntervalUtils.isNegative(parsedDelay),
s"delay threshold ($delayThreshold) should not be negative.")
EliminateEventTimeWatermark(
EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan))
EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan)
}

/**
Expand Down

0 comments on commit cfe072a

Please sign in to comment.