From e5dd9e24ce03a9cc554ee2aa0470ea122aaae10e Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Thu, 17 Oct 2024 17:55:01 +0900 Subject: [PATCH] [SPARK-50007][SQL][SS] Provide default values for metrics on observe API when physical node is lost in executed plan --- .../spark/sql/execution/QueryExecution.scala | 2 +- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 14 ++++++++++++++ ...treamingQueryOptimizationCorrectnessSuite.scala | 5 +++-- 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 6ff2c5d4b9d32..1fa2ebce8ab81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -243,7 +243,7 @@ class QueryExecution( def toRdd: RDD[InternalRow] = lazyToRdd.get /** Get the metrics observed during the execution of the query plan. */ - def observedMetrics: Map[String, Row] = CollectMetricsExec.collect(executedPlan) + def observedMetrics: Map[String, Row] = CollectMetricsExec.collect(executedPlan, Some(analyzed)) protected def preparations: Seq[Rule[SparkPlan]] = { QueryExecution.preparations(sparkSession, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index e3346684285a9..d123800aa20ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -4941,6 +4941,20 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark Row(Array(0), Array(0)), Row(Array(1), Array(1)), Row(Array(2), Array(2))) checkAnswer(df, expectedAnswer) } + + test("SPARK-50007: default metrics is provided even observe node is pruned out") { + val namedObservation = Observation("observation") + spark.range(10) + .observe(namedObservation, count(lit(1)).as("rows")) + // Enforce PruneFilters to come into play and prune subtree. We could do the same + // with the reproducer of SPARK-48267, but let's just be simpler. + .filter(expr("false")) + .collect() + + // This should produce the default value of metrics. Before SPARK-50007, the test fails + // because `namedObservation.getOrEmpty` is an empty Map. + assert(namedObservation.getOrEmpty.get("rows") === Some(0L)) + } } case class Foo(bar: Option[String]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryOptimizationCorrectnessSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryOptimizationCorrectnessSuite.scala index 7e71d67f951c8..641f4668de3e6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryOptimizationCorrectnessSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryOptimizationCorrectnessSuite.scala @@ -587,7 +587,7 @@ class StreamingQueryOptimizationCorrectnessSuite extends StreamTest { ) } - test("SPARK-XXXXX: default metrics is provided even observe node is pruned out") { + test("SPARK-50007: default metrics is provided even observe node is pruned out") { // We disable SPARK-49699 to test the case observe node is pruned out. withSQLConf(SQLConf.PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key -> "true") { val input1 = MemoryStream[Int] @@ -603,7 +603,8 @@ class StreamingQueryOptimizationCorrectnessSuite extends StreamTest { CheckNewAnswer(), Execute { qe => val observeRow = qe.lastExecution.observedMetrics.get("observation") - // This should not be null even though the observe node is pruned out. + // This should produce the default value of metrics. Before SPARK-50007, the test fails + // because `observeRow` is None. (Spark fails to find the metrics by name.) assert(observeRow.get.getAs[Long]("rows") == 0L) } )