Skip to content

Commit

Permalink
[SPARK-46062][SQL] Sync the isStreaming flag between CTE definition a…
Browse files Browse the repository at this point in the history
…nd reference

This PR proposes to sync the flag `isStreaming` from CTE definition to CTE reference.

The essential issue is that CTE reference node cannot determine the flag `isStreaming` by itself, and never be able to have a proper value and always takes the default as it does not have a parameter in constructor. The other flag `resolved` is handled, and we need to do the same for `isStreaming`.

Once we add the parameter to the constructor, we will also need to make sure the flag is in sync with CTE definition. We have a rule `ResolveWithCTE` doing the sync, hence we add the logic to sync the flag `isStreaming` as well.

The bug may impact some rules which behaves differently depending on isStreaming flag. It would no longer be a problem once CTE reference is replaced with CTE definition at some point in "optimization phase", but all rules in analyzer and optimizer being triggered before the rule takes effect may misbehave based on incorrect isStreaming flag.

No.

New UT.

No.

Closes #43966 from HeartSaVioR/SPARK-46062.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit 4304663)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
  • Loading branch information
HeartSaVioR committed Nov 23, 2023
1 parent cfe072a commit e87d166
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
d.child
} else {
// Add a `SubqueryAlias` for hint-resolving rules to match relation names.
SubqueryAlias(table, CTERelationRef(d.id, d.resolved, d.output))
SubqueryAlias(table, CTERelationRef(d.id, d.resolved, d.output, d.isStreaming))
}
}.getOrElse(u)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ object ResolveWithCTE extends Rule[LogicalPlan] {

case ref: CTERelationRef if !ref.resolved =>
cteDefMap.get(ref.cteId).map { cteDef =>
CTERelationRef(cteDef.id, cteDef.resolved, cteDef.output)
CTERelationRef(cteDef.id, cteDef.resolved, cteDef.output, cteDef.isStreaming)
}.getOrElse {
ref
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,8 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] {
val subqueryCTE = header.plan.asInstanceOf[CTERelationDef]
GetStructField(
ScalarSubquery(
CTERelationRef(subqueryCTE.id, _resolved = true, subqueryCTE.output),
CTERelationRef(subqueryCTE.id, _resolved = true, subqueryCTE.output,
subqueryCTE.isStreaming),
exprId = ssr.exprId),
ssr.headerIndex)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ object PushdownPredicatesAndPruneColumnsForCTEDef extends Rule[LogicalPlan] {
cteDef
}

case cteRef @ CTERelationRef(cteId, _, output, _) =>
case cteRef @ CTERelationRef(cteId, _, output, _, _) =>
val (cteDef, _, _, newAttrSet) = cteMap(cteId)
if (newAttrSet.size < output.size) {
val indices = newAttrSet.toSeq.map(cteDef.output.indexOf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,7 @@ case class CTERelationRef(
cteId: Long,
_resolved: Boolean,
override val output: Seq[Attribute],
override val isStreaming: Boolean,
statsOpt: Option[Statistics] = None) extends LeafNode with MultiInstanceRelation {

final override val nodePatterns: Seq[TreePattern] = Seq(CTE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1466,4 +1466,19 @@ class AnalysisSuite extends AnalysisTest with Matchers {
// EventTimeWatermark node is NOT eliminated.
assert(analyzed.exists(_.isInstanceOf[EventTimeWatermark]))
}

test("SPARK-46062: isStreaming flag is synced from CTE definition to CTE reference") {
val cteDef = CTERelationDef(streamingRelation.select($"a", $"ts"))
// Intentionally marking the flag _resolved to false, so that analyzer has a chance to sync
// the flag isStreaming on syncing the flag _resolved.
val cteRef = CTERelationRef(cteDef.id, _resolved = false, Nil, isStreaming = false)
val plan = WithCTE(cteRef, Seq(cteDef)).analyze

val refs = plan.collect {
case r: CTERelationRef => r
}
assert(refs.length == 1)
assert(refs.head.resolved)
assert(refs.head.isStreaming)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ class MergeScalarSubqueriesSuite extends PlanTest {
}

private def extractorExpression(cteIndex: Int, output: Seq[Attribute], fieldIndex: Int) = {
GetStructField(ScalarSubquery(CTERelationRef(cteIndex, _resolved = true, output)), fieldIndex)
GetStructField(ScalarSubquery(
CTERelationRef(cteIndex, _resolved = true, output, isStreaming = false)), fieldIndex)
.as("scalarsubquery()")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, Row, SaveMode}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Literal, Rand, Randn, Shuffle, Uuid}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, CTERelationRef, LocalRelation}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Complete
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2}
Expand Down Expand Up @@ -1317,6 +1317,51 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
}
}

test("SPARK-46062: streaming query reading from CTE, which refers to temp view from " +
"streaming source") {
val inputStream = MemoryStream[Int]
inputStream.toDF().createOrReplaceTempView("tv")

val df = spark.sql(
"""
|WITH w as (
| SELECT * FROM tv
|)
|SELECT value from w
|""".stripMargin)

testStream(df)(
AddData(inputStream, 1, 2, 3),
CheckAnswer(1, 2, 3),
Execute { q =>
var isStreamingForCteDef: Option[Boolean] = None
var isStreamingForCteRef: Option[Boolean] = None

q.analyzedPlan.foreach {
case d: CTERelationDef =>
assert(d.resolved, "The definition node must be resolved after analysis.")
isStreamingForCteDef = Some(d.isStreaming)

case d: CTERelationRef =>
assert(d.resolved, "The reference node must be marked as resolved after analysis.")
isStreamingForCteRef = Some(d.isStreaming)

case _ =>
}

assert(isStreamingForCteDef.isDefined && isStreamingForCteRef.isDefined,
"Both definition and reference for CTE should be available in analyzed plan.")

assert(isStreamingForCteDef.get, "Expected isStreaming=true for CTE definition, but " +
"isStreaming is set to false.")

assert(isStreamingForCteDef === isStreamingForCteRef,
"isStreaming flag should be carried over from definition to reference, " +
s"definition: ${isStreamingForCteDef.get}, reference: ${isStreamingForCteRef.get}.")
}
)
}

private def checkExceptionMessage(df: DataFrame): Unit = {
withTempDir { outputDir =>
withTempDir { checkpointDir =>
Expand Down

0 comments on commit e87d166

Please sign in to comment.