Skip to content

Commit

Permalink
[SPARK-50007][SQL][SS] Provide default values for metrics on observe …
Browse files Browse the repository at this point in the history
…API when physical node is lost in executed plan
  • Loading branch information
HeartSaVioR committed Oct 17, 2024
1 parent 0dd8e10 commit e5dd9e2
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ class QueryExecution(
def toRdd: RDD[InternalRow] = lazyToRdd.get

/** Get the metrics observed during the execution of the query plan. */
def observedMetrics: Map[String, Row] = CollectMetricsExec.collect(executedPlan)
def observedMetrics: Map[String, Row] = CollectMetricsExec.collect(executedPlan, Some(analyzed))

protected def preparations: Seq[Rule[SparkPlan]] = {
QueryExecution.preparations(sparkSession,
Expand Down
14 changes: 14 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4941,6 +4941,20 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
Row(Array(0), Array(0)), Row(Array(1), Array(1)), Row(Array(2), Array(2)))
checkAnswer(df, expectedAnswer)
}

test("SPARK-50007: default metrics is provided even observe node is pruned out") {
val namedObservation = Observation("observation")
spark.range(10)
.observe(namedObservation, count(lit(1)).as("rows"))
// Enforce PruneFilters to come into play and prune subtree. We could do the same
// with the reproducer of SPARK-48267, but let's just be simpler.
.filter(expr("false"))
.collect()

// This should produce the default value of metrics. Before SPARK-50007, the test fails
// because `namedObservation.getOrEmpty` is an empty Map.
assert(namedObservation.getOrEmpty.get("rows") === Some(0L))
}
}

case class Foo(bar: Option[String])
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ class StreamingQueryOptimizationCorrectnessSuite extends StreamTest {
)
}

test("SPARK-XXXXX: default metrics is provided even observe node is pruned out") {
test("SPARK-50007: default metrics is provided even observe node is pruned out") {
// We disable SPARK-49699 to test the case observe node is pruned out.
withSQLConf(SQLConf.PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key -> "true") {
val input1 = MemoryStream[Int]
Expand All @@ -603,7 +603,8 @@ class StreamingQueryOptimizationCorrectnessSuite extends StreamTest {
CheckNewAnswer(),
Execute { qe =>
val observeRow = qe.lastExecution.observedMetrics.get("observation")
// This should not be null even though the observe node is pruned out.
// This should produce the default value of metrics. Before SPARK-50007, the test fails
// because `observeRow` is None. (Spark fails to find the metrics by name.)
assert(observeRow.get.getAs[Long]("rows") == 0L)
}
)
Expand Down

0 comments on commit e5dd9e2

Please sign in to comment.