From e87d166a81c620c15cc94dfb15c17c9cacbbc9b6 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Thu, 23 Nov 2023 22:32:16 +0900 Subject: [PATCH] [SPARK-46062][SQL] Sync the isStreaming flag between CTE definition and reference This PR proposes to sync the flag `isStreaming` from CTE definition to CTE reference. The essential issue is that CTE reference node cannot determine the flag `isStreaming` by itself, and never be able to have a proper value and always takes the default as it does not have a parameter in constructor. The other flag `resolved` is handled, and we need to do the same for `isStreaming`. Once we add the parameter to the constructor, we will also need to make sure the flag is in sync with CTE definition. We have a rule `ResolveWithCTE` doing the sync, hence we add the logic to sync the flag `isStreaming` as well. The bug may impact some rules which behaves differently depending on isStreaming flag. It would no longer be a problem once CTE reference is replaced with CTE definition at some point in "optimization phase", but all rules in analyzer and optimizer being triggered before the rule takes effect may misbehave based on incorrect isStreaming flag. No. New UT. No. Closes #43966 from HeartSaVioR/SPARK-46062. Authored-by: Jungtaek Lim Signed-off-by: Jungtaek Lim (cherry picked from commit 43046631a5d4ac7201361a00473cc87fa52ab5a7) Signed-off-by: Jungtaek Lim --- .../catalyst/analysis/CTESubstitution.scala | 2 +- .../catalyst/analysis/ResolveWithCTE.scala | 2 +- .../optimizer/MergeScalarSubqueries.scala | 3 +- ...wnPredicatesAndPruneColumnsForCTEDef.scala | 2 +- .../plans/logical/basicLogicalOperators.scala | 1 + .../sql/catalyst/analysis/AnalysisSuite.scala | 15 ++++++ .../MergeScalarSubqueriesSuite.scala | 3 +- .../sql/streaming/StreamingQuerySuite.scala | 47 ++++++++++++++++++- 8 files changed, 69 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala index 77c687843c355..f047483b20fd0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -261,7 +261,7 @@ object CTESubstitution extends Rule[LogicalPlan] { d.child } else { // Add a `SubqueryAlias` for hint-resolving rules to match relation names. - SubqueryAlias(table, CTERelationRef(d.id, d.resolved, d.output)) + SubqueryAlias(table, CTERelationRef(d.id, d.resolved, d.output, d.isStreaming)) } }.getOrElse(u) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala index 78b776f12f074..f1077378b2d9c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala @@ -51,7 +51,7 @@ object ResolveWithCTE extends Rule[LogicalPlan] { case ref: CTERelationRef if !ref.resolved => cteDefMap.get(ref.cteId).map { cteDef => - CTERelationRef(cteDef.id, cteDef.resolved, cteDef.output) + CTERelationRef(cteDef.id, cteDef.resolved, cteDef.output, cteDef.isStreaming) }.getOrElse { ref } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala index 6184160829ba6..ff0bc5e66d755 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala @@ -381,7 +381,8 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] { val subqueryCTE = header.plan.asInstanceOf[CTERelationDef] GetStructField( ScalarSubquery( - CTERelationRef(subqueryCTE.id, _resolved = true, subqueryCTE.output), + CTERelationRef(subqueryCTE.id, _resolved = true, subqueryCTE.output, + subqueryCTE.isStreaming), exprId = ssr.exprId), ssr.headerIndex) } else { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala index f351ba0b39af9..4185967361607 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala @@ -141,7 +141,7 @@ object PushdownPredicatesAndPruneColumnsForCTEDef extends Rule[LogicalPlan] { cteDef } - case cteRef @ CTERelationRef(cteId, _, output, _) => + case cteRef @ CTERelationRef(cteId, _, output, _, _) => val (cteDef, _, _, newAttrSet) = cteMap(cteId) if (newAttrSet.size < output.size) { val indices = newAttrSet.toSeq.map(cteDef.output.indexOf) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index b5a2f0974242a..d775b72a5daff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -837,6 +837,7 @@ case class CTERelationRef( cteId: Long, _resolved: Boolean, override val output: Seq[Attribute], + override val isStreaming: Boolean, statsOpt: Option[Statistics] = None) extends LeafNode with MultiInstanceRelation { final override val nodePatterns: Seq[TreePattern] = Seq(CTE) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 9d51c41a6d8f7..8d9f9abc00b75 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -1466,4 +1466,19 @@ class AnalysisSuite extends AnalysisTest with Matchers { // EventTimeWatermark node is NOT eliminated. assert(analyzed.exists(_.isInstanceOf[EventTimeWatermark])) } + + test("SPARK-46062: isStreaming flag is synced from CTE definition to CTE reference") { + val cteDef = CTERelationDef(streamingRelation.select($"a", $"ts")) + // Intentionally marking the flag _resolved to false, so that analyzer has a chance to sync + // the flag isStreaming on syncing the flag _resolved. + val cteRef = CTERelationRef(cteDef.id, _resolved = false, Nil, isStreaming = false) + val plan = WithCTE(cteRef, Seq(cteDef)).analyze + + val refs = plan.collect { + case r: CTERelationRef => r + } + assert(refs.length == 1) + assert(refs.head.resolved) + assert(refs.head.isStreaming) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueriesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueriesSuite.scala index 8af0e02855b12..13e138414781f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueriesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueriesSuite.scala @@ -42,7 +42,8 @@ class MergeScalarSubqueriesSuite extends PlanTest { } private def extractorExpression(cteIndex: Int, output: Seq[Attribute], fieldIndex: Int) = { - GetStructField(ScalarSubquery(CTERelationRef(cteIndex, _resolved = true, output)), fieldIndex) + GetStructField(ScalarSubquery( + CTERelationRef(cteIndex, _resolved = true, output, isStreaming = false)), fieldIndex) .as("scalarsubquery()") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index b889ac1897484..da9b579b397cd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -39,7 +39,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, Row, SaveMode} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Literal, Rand, Randn, Shuffle, Uuid} -import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, CTERelationRef, LocalRelation} import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Complete import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2} @@ -1317,6 +1317,51 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi } } + test("SPARK-46062: streaming query reading from CTE, which refers to temp view from " + + "streaming source") { + val inputStream = MemoryStream[Int] + inputStream.toDF().createOrReplaceTempView("tv") + + val df = spark.sql( + """ + |WITH w as ( + | SELECT * FROM tv + |) + |SELECT value from w + |""".stripMargin) + + testStream(df)( + AddData(inputStream, 1, 2, 3), + CheckAnswer(1, 2, 3), + Execute { q => + var isStreamingForCteDef: Option[Boolean] = None + var isStreamingForCteRef: Option[Boolean] = None + + q.analyzedPlan.foreach { + case d: CTERelationDef => + assert(d.resolved, "The definition node must be resolved after analysis.") + isStreamingForCteDef = Some(d.isStreaming) + + case d: CTERelationRef => + assert(d.resolved, "The reference node must be marked as resolved after analysis.") + isStreamingForCteRef = Some(d.isStreaming) + + case _ => + } + + assert(isStreamingForCteDef.isDefined && isStreamingForCteRef.isDefined, + "Both definition and reference for CTE should be available in analyzed plan.") + + assert(isStreamingForCteDef.get, "Expected isStreaming=true for CTE definition, but " + + "isStreaming is set to false.") + + assert(isStreamingForCteDef === isStreamingForCteRef, + "isStreaming flag should be carried over from definition to reference, " + + s"definition: ${isStreamingForCteDef.get}, reference: ${isStreamingForCteRef.get}.") + } + ) + } + private def checkExceptionMessage(df: DataFrame): Unit = { withTempDir { outputDir => withTempDir { checkpointDir =>