Skip to content

Commit

Permalink
Add e2e tests
Browse files Browse the repository at this point in the history
  • Loading branch information
HeartSaVioR committed Sep 19, 2024
1 parent 601c9b3 commit d829f11
Showing 1 changed file with 63 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.sql.Timestamp

import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions.{expr, lit, window}
import org.apache.spark.sql.functions.{count, expr, lit, timestamp_seconds, window}
import org.apache.spark.sql.internal.SQLConf

/**
Expand Down Expand Up @@ -524,4 +524,66 @@ class StreamingQueryOptimizationCorrectnessSuite extends StreamTest {
doTest(numExpectedStatefulOperatorsForOneEmptySource = 1)
}
}

test("SPARK-49699: observe node is not pruned out from PruneFilters") {
val input1 = MemoryStream[Int]
val df = input1.toDF()
.withColumn("eventTime", timestamp_seconds($"value"))
.observe("observation", count(lit(1)).as("rows"))
// Enforce PruneFilters to come into play and prune subtree. We could do the same
// with the reproducer of SPARK-48267, but let's just be simpler.
.filter(expr("false"))

testStream(df)(
AddData(input1, 1, 2, 3),
CheckNewAnswer(),
Execute { qe =>
val observeRow = qe.lastExecution.observedMetrics.get("observation")
assert(observeRow.get.getAs[Long]("rows") == 3L)
}
)
}

test("SPARK-49699: watermark node is not pruned out from PruneFilters") {
// NOTE: The test actually passes without SPARK-49699, because of the trickiness of
// filter pushdown and PruneFilters. Unlike observe node, the `false` filter is pushed down
// below to watermark node, hence PruneFilters rule does not prune out watermark node even
// before SPARK-49699. Propagate empty relation does not also propagate emptiness into
// watermark node, so the node is retained. The test is added for preventing regression.

val input1 = MemoryStream[Int]
val df = input1.toDF()
.withColumn("eventTime", timestamp_seconds($"value"))
.withWatermark("eventTime", "0 second")
// Enforce PruneFilter to come into play and prune subtree. We could do the same
// with the reproducer of SPARK-48267, but let's just be simpler.
.filter(expr("false"))

testStream(df)(
AddData(input1, 1, 2, 3),
CheckNewAnswer(),
Execute { qe =>
// If the watermark node is pruned out, this would be null.
assert(qe.lastProgress.eventTime.get("watermark") != null)
}
)
}

test("SPARK-49699: stateful operator node is not pruned out from PruneFilters") {
val input1 = MemoryStream[Int]
val df = input1.toDF()
.groupBy("value")
.count()
// Enforce PruneFilter to come into play and prune subtree. We could do the same
// with the reproducer of SPARK-48267, but let's just be simpler.
.filter(expr("false"))

testStream(df, OutputMode.Complete())(
AddData(input1, 1, 2, 3),
CheckNewAnswer(),
Execute { qe =>
assert(qe.lastProgress.stateOperators.length == 1)
}
)
}
}

0 comments on commit d829f11

Please sign in to comment.