diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala index 77c687843c355..f047483b20fd0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -261,7 +261,7 @@ object CTESubstitution extends Rule[LogicalPlan] { d.child } else { // Add a `SubqueryAlias` for hint-resolving rules to match relation names. - SubqueryAlias(table, CTERelationRef(d.id, d.resolved, d.output)) + SubqueryAlias(table, CTERelationRef(d.id, d.resolved, d.output, d.isStreaming)) } }.getOrElse(u) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala index 78b776f12f074..f1077378b2d9c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala @@ -51,7 +51,7 @@ object ResolveWithCTE extends Rule[LogicalPlan] { case ref: CTERelationRef if !ref.resolved => cteDefMap.get(ref.cteId).map { cteDef => - CTERelationRef(cteDef.id, cteDef.resolved, cteDef.output) + CTERelationRef(cteDef.id, cteDef.resolved, cteDef.output, cteDef.isStreaming) }.getOrElse { ref } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala index 6184160829ba6..ff0bc5e66d755 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala @@ -381,7 +381,8 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] { val subqueryCTE = header.plan.asInstanceOf[CTERelationDef] GetStructField( ScalarSubquery( - CTERelationRef(subqueryCTE.id, _resolved = true, subqueryCTE.output), + CTERelationRef(subqueryCTE.id, _resolved = true, subqueryCTE.output, + subqueryCTE.isStreaming), exprId = ssr.exprId), ssr.headerIndex) } else { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala index f351ba0b39af9..4185967361607 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala @@ -141,7 +141,7 @@ object PushdownPredicatesAndPruneColumnsForCTEDef extends Rule[LogicalPlan] { cteDef } - case cteRef @ CTERelationRef(cteId, _, output, _) => + case cteRef @ CTERelationRef(cteId, _, output, _, _) => val (cteDef, _, _, newAttrSet) = cteMap(cteId) if (newAttrSet.size < output.size) { val indices = newAttrSet.toSeq.map(cteDef.output.indexOf) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index b5a2f0974242a..d775b72a5daff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -837,6 +837,7 @@ case class CTERelationRef( cteId: Long, _resolved: Boolean, override val output: Seq[Attribute], + override val isStreaming: Boolean, statsOpt: Option[Statistics] = None) extends LeafNode with MultiInstanceRelation { final override val nodePatterns: Seq[TreePattern] = Seq(CTE) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 9d51c41a6d8f7..8d9f9abc00b75 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -1466,4 +1466,19 @@ class AnalysisSuite extends AnalysisTest with Matchers { // EventTimeWatermark node is NOT eliminated. assert(analyzed.exists(_.isInstanceOf[EventTimeWatermark])) } + + test("SPARK-46062: isStreaming flag is synced from CTE definition to CTE reference") { + val cteDef = CTERelationDef(streamingRelation.select($"a", $"ts")) + // Intentionally marking the flag _resolved to false, so that analyzer has a chance to sync + // the flag isStreaming on syncing the flag _resolved. + val cteRef = CTERelationRef(cteDef.id, _resolved = false, Nil, isStreaming = false) + val plan = WithCTE(cteRef, Seq(cteDef)).analyze + + val refs = plan.collect { + case r: CTERelationRef => r + } + assert(refs.length == 1) + assert(refs.head.resolved) + assert(refs.head.isStreaming) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueriesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueriesSuite.scala index 8af0e02855b12..13e138414781f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueriesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueriesSuite.scala @@ -42,7 +42,8 @@ class MergeScalarSubqueriesSuite extends PlanTest { } private def extractorExpression(cteIndex: Int, output: Seq[Attribute], fieldIndex: Int) = { - GetStructField(ScalarSubquery(CTERelationRef(cteIndex, _resolved = true, output)), fieldIndex) + GetStructField(ScalarSubquery( + CTERelationRef(cteIndex, _resolved = true, output, isStreaming = false)), fieldIndex) .as("scalarsubquery()") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index b889ac1897484..da9b579b397cd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -39,7 +39,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, Row, SaveMode} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Literal, Rand, Randn, Shuffle, Uuid} -import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, CTERelationRef, LocalRelation} import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Complete import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2} @@ -1317,6 +1317,51 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi } } + test("SPARK-46062: streaming query reading from CTE, which refers to temp view from " + + "streaming source") { + val inputStream = MemoryStream[Int] + inputStream.toDF().createOrReplaceTempView("tv") + + val df = spark.sql( + """ + |WITH w as ( + | SELECT * FROM tv + |) + |SELECT value from w + |""".stripMargin) + + testStream(df)( + AddData(inputStream, 1, 2, 3), + CheckAnswer(1, 2, 3), + Execute { q => + var isStreamingForCteDef: Option[Boolean] = None + var isStreamingForCteRef: Option[Boolean] = None + + q.analyzedPlan.foreach { + case d: CTERelationDef => + assert(d.resolved, "The definition node must be resolved after analysis.") + isStreamingForCteDef = Some(d.isStreaming) + + case d: CTERelationRef => + assert(d.resolved, "The reference node must be marked as resolved after analysis.") + isStreamingForCteRef = Some(d.isStreaming) + + case _ => + } + + assert(isStreamingForCteDef.isDefined && isStreamingForCteRef.isDefined, + "Both definition and reference for CTE should be available in analyzed plan.") + + assert(isStreamingForCteDef.get, "Expected isStreaming=true for CTE definition, but " + + "isStreaming is set to false.") + + assert(isStreamingForCteDef === isStreamingForCteRef, + "isStreaming flag should be carried over from definition to reference, " + + s"definition: ${isStreamingForCteDef.get}, reference: ${isStreamingForCteRef.get}.") + } + ) + } + private def checkExceptionMessage(df: DataFrame): Unit = { withTempDir { outputDir => withTempDir { checkpointDir =>