Skip to content

Commit

Permalink
[SPARK-46062][SQL] Sync the isStreaming flag between CTE definition a…
Browse files Browse the repository at this point in the history
…nd reference

### What changes were proposed in this pull request?

This PR proposes to sync the flag `isStreaming` from CTE definition to CTE reference.

The essential issue is that CTE reference node cannot determine the flag `isStreaming` by itself, and never be able to have a proper value and always takes the default as it does not have a parameter in constructor. The other flag `resolved` is handled, and we need to do the same for `isStreaming`.

Once we add the parameter to the constructor, we will also need to make sure the flag is in sync with CTE definition. We have a rule `ResolveWithCTE` doing the sync, hence we add the logic to sync the flag `isStreaming` as well.

### Why are the changes needed?

The bug may impact some rules which behaves differently depending on isStreaming flag. It would no longer be a problem once CTE reference is replaced with CTE definition at some point in "optimization phase", but all rules in analyzer and optimizer being triggered before the rule takes effect may misbehave based on incorrect isStreaming flag.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New UT.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#43966 from HeartSaVioR/SPARK-46062.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
  • Loading branch information
HeartSaVioR committed Nov 23, 2023
1 parent 9c21238 commit 4304663
Show file tree
Hide file tree
Showing 24 changed files with 239 additions and 176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
d.child
} else {
// Add a `SubqueryAlias` for hint-resolving rules to match relation names.
SubqueryAlias(table, CTERelationRef(d.id, d.resolved, d.output))
SubqueryAlias(table, CTERelationRef(d.id, d.resolved, d.output, d.isStreaming))
}
}.getOrElse(u)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ object ResolveWithCTE extends Rule[LogicalPlan] {

case ref: CTERelationRef if !ref.resolved =>
cteDefMap.get(ref.cteId).map { cteDef =>
CTERelationRef(cteDef.id, cteDef.resolved, cteDef.output)
CTERelationRef(cteDef.id, cteDef.resolved, cteDef.output, cteDef.isStreaming)
}.getOrElse {
ref
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,8 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] {
val subqueryCTE = header.plan.asInstanceOf[CTERelationDef]
GetStructField(
ScalarSubquery(
CTERelationRef(subqueryCTE.id, _resolved = true, subqueryCTE.output),
CTERelationRef(subqueryCTE.id, _resolved = true, subqueryCTE.output,
subqueryCTE.isStreaming),
exprId = ssr.exprId),
ssr.headerIndex)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ object PushdownPredicatesAndPruneColumnsForCTEDef extends Rule[LogicalPlan] {
cteDef
}

case cteRef @ CTERelationRef(cteId, _, output, _) =>
case cteRef @ CTERelationRef(cteId, _, output, _, _) =>
val (cteDef, _, _, newAttrSet) = cteMap(cteId)
if (needsPruning(cteDef.child, newAttrSet)) {
val indices = newAttrSet.toSeq.map(cteDef.output.indexOf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,7 @@ case class CTERelationRef(
cteId: Long,
_resolved: Boolean,
override val output: Seq[Attribute],
override val isStreaming: Boolean,
statsOpt: Option[Statistics] = None) extends LeafNode with MultiInstanceRelation {

final override val nodePatterns: Seq[TreePattern] = Seq(CTE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1573,7 +1573,7 @@ class AnalysisSuite extends AnalysisTest with Matchers {
test("SPARK-43030: deduplicate relations in CTE relation definitions") {
val join = testRelation.as("left").join(testRelation.as("right"))
val cteDef = CTERelationDef(join)
val cteRef = CTERelationRef(cteDef.id, false, Nil)
val cteRef = CTERelationRef(cteDef.id, false, Nil, false)

withClue("flat CTE") {
val plan = WithCTE(cteRef.select($"left.a"), Seq(cteDef)).analyze
Expand All @@ -1586,7 +1586,7 @@ class AnalysisSuite extends AnalysisTest with Matchers {

withClue("nested CTE") {
val cteDef2 = CTERelationDef(WithCTE(cteRef.join(testRelation), Seq(cteDef)))
val cteRef2 = CTERelationRef(cteDef2.id, false, Nil)
val cteRef2 = CTERelationRef(cteDef2.id, false, Nil, false)
val plan = WithCTE(cteRef2, Seq(cteDef2)).analyze
val relations = plan.collect {
case r: LocalRelation => r
Expand All @@ -1598,7 +1598,7 @@ class AnalysisSuite extends AnalysisTest with Matchers {

test("SPARK-43030: deduplicate CTE relation references") {
val cteDef = CTERelationDef(testRelation.select($"a"))
val cteRef = CTERelationRef(cteDef.id, false, Nil)
val cteRef = CTERelationRef(cteDef.id, false, Nil, false)

withClue("single reference") {
val plan = WithCTE(cteRef.where($"a" > 1), Seq(cteDef)).analyze
Expand All @@ -1621,7 +1621,7 @@ class AnalysisSuite extends AnalysisTest with Matchers {

withClue("CTE relation has duplicated attributes") {
val cteDef = CTERelationDef(testRelation.select($"a", $"a"))
val cteRef = CTERelationRef(cteDef.id, false, Nil)
val cteRef = CTERelationRef(cteDef.id, false, Nil, false)
val plan = WithCTE(cteRef.join(cteRef.select($"a")), Seq(cteDef)).analyze
val refs = plan.collect {
case r: CTERelationRef => r
Expand All @@ -1633,14 +1633,14 @@ class AnalysisSuite extends AnalysisTest with Matchers {
withClue("CTE relation has duplicate aliases") {
val alias = Alias($"a", "x")()
val cteDef = CTERelationDef(testRelation.select(alias, alias).where($"x" === 1))
val cteRef = CTERelationRef(cteDef.id, false, Nil)
val cteRef = CTERelationRef(cteDef.id, false, Nil, false)
// Should not fail with the assertion failure: Found duplicate rewrite attributes.
WithCTE(cteRef.join(cteRef), Seq(cteDef)).analyze
}

withClue("references in both CTE relation definition and main query") {
val cteDef2 = CTERelationDef(cteRef.where($"a" > 2))
val cteRef2 = CTERelationRef(cteDef2.id, false, Nil)
val cteRef2 = CTERelationRef(cteDef2.id, false, Nil, false)
val plan = WithCTE(cteRef.union(cteRef2), Seq(cteDef, cteDef2)).analyze
val refs = plan.collect {
case r: CTERelationRef => r
Expand Down Expand Up @@ -1747,4 +1747,19 @@ class AnalysisSuite extends AnalysisTest with Matchers {
// EventTimeWatermark node is NOT eliminated.
assert(analyzed.exists(_.isInstanceOf[EventTimeWatermark]))
}

test("SPARK-46062: isStreaming flag is synced from CTE definition to CTE reference") {
val cteDef = CTERelationDef(streamingRelation.select($"a", $"ts"))
// Intentionally marking the flag _resolved to false, so that analyzer has a chance to sync
// the flag isStreaming on syncing the flag _resolved.
val cteRef = CTERelationRef(cteDef.id, _resolved = false, Nil, isStreaming = false)
val plan = WithCTE(cteRef, Seq(cteDef)).analyze

val refs = plan.collect {
case r: CTERelationRef => r
}
assert(refs.length == 1)
assert(refs.head.resolved)
assert(refs.head.isStreaming)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ class MergeScalarSubqueriesSuite extends PlanTest {
}

private def extractorExpression(cteIndex: Int, output: Seq[Attribute], fieldIndex: Int) = {
GetStructField(ScalarSubquery(CTERelationRef(cteIndex, _resolved = true, output)), fieldIndex)
GetStructField(ScalarSubquery(
CTERelationRef(cteIndex, _resolved = true, output, isStreaming = false)), fieldIndex)
.as("scalarsubquery()")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ CreateViewCommand `myview`, [(c1,None)], WITH "v"("a") AS (SELECT 1) SELECT "a"
: +- OneRowRelation
+- Project [a#x]
+- SubqueryAlias v
+- CTERelationRef xxxx, true, [a#x]
+- CTERelationRef xxxx, true, [a#x], false


-- !query
Expand All @@ -438,7 +438,7 @@ Project [a1#x AS a2#x]
: +- OneRowRelation
+- Project [a#x]
+- SubqueryAlias v
+- CTERelationRef xxxx, true, [a#x]
+- CTERelationRef xxxx, true, [a#x], false


-- !query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ CreateDataSourceTableAsSelectCommand `spark_catalog`.`default`.`cte_tbl`, ErrorI
: +- OneRowRelation
+- Project [col#x]
+- SubqueryAlias s
+- CTERelationRef xxxx, true, [col#x]
+- CTERelationRef xxxx, true, [col#x], false


-- !query
Expand All @@ -32,7 +32,7 @@ CreateViewCommand `cte_view`, WITH s AS (SELECT 42 AS col) SELECT * FROM s, fals
: +- OneRowRelation
+- Project [col#x]
+- SubqueryAlias s
+- CTERelationRef xxxx, true, [col#x]
+- CTERelationRef xxxx, true, [col#x], false


-- !query
Expand All @@ -49,7 +49,7 @@ Project [col#x]
: +- OneRowRelation
+- Project [col#x]
+- SubqueryAlias s
+- CTERelationRef xxxx, true, [col#x]
+- CTERelationRef xxxx, true, [col#x], false


-- !query
Expand All @@ -64,7 +64,7 @@ InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_d
: +- OneRowRelation
+- Project [col#x]
+- SubqueryAlias S
+- CTERelationRef xxxx, true, [col#x]
+- CTERelationRef xxxx, true, [col#x], false


-- !query
Expand All @@ -86,7 +86,7 @@ InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_d
: +- OneRowRelation
+- Project [col#x]
+- SubqueryAlias s
+- CTERelationRef xxxx, true, [col#x]
+- CTERelationRef xxxx, true, [col#x], false


-- !query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ WithCTE
: +- SubqueryAlias t
: +- Project [1#x]
: +- SubqueryAlias t2
: +- CTERelationRef xxxx, true, [1#x]
: +- CTERelationRef xxxx, true, [1#x], false
+- Project [1#x]
+- SubqueryAlias t
+- CTERelationRef xxxx, true, [1#x]
+- CTERelationRef xxxx, true, [1#x], false


-- !query
Expand All @@ -37,7 +37,7 @@ Aggregate [max(c#x) AS max(c)#x]
: +- OneRowRelation
+- Project [c#x]
+- SubqueryAlias t
+- CTERelationRef xxxx, true, [c#x]
+- CTERelationRef xxxx, true, [c#x], false


-- !query
Expand All @@ -54,7 +54,7 @@ Project [scalar-subquery#x [] AS scalarsubquery()#x]
: : +- OneRowRelation
: +- Project [1#x]
: +- SubqueryAlias t
: +- CTERelationRef xxxx, true, [1#x]
: +- CTERelationRef xxxx, true, [1#x], false
+- OneRowRelation


Expand Down Expand Up @@ -137,11 +137,11 @@ WithCTE
: : : +- OneRowRelation
: : +- Project [c#x]
: : +- SubqueryAlias t
: : +- CTERelationRef xxxx, true, [c#x]
: : +- CTERelationRef xxxx, true, [c#x], false
: +- OneRowRelation
+- Project [scalarsubquery()#x]
+- SubqueryAlias t2
+- CTERelationRef xxxx, true, [scalarsubquery()#x]
+- CTERelationRef xxxx, true, [scalarsubquery()#x], false


-- !query
Expand Down Expand Up @@ -191,7 +191,7 @@ WithCTE
+- SubqueryAlias __auto_generated_subquery_name
+- Project [c#x]
+- SubqueryAlias t
+- CTERelationRef xxxx, true, [c#x]
+- CTERelationRef xxxx, true, [c#x], false


-- !query
Expand Down Expand Up @@ -220,7 +220,7 @@ WithCTE
+- SubqueryAlias __auto_generated_subquery_name
+- Project [c#x]
+- SubqueryAlias t
+- CTERelationRef xxxx, true, [c#x]
+- CTERelationRef xxxx, true, [c#x], false


-- !query
Expand Down Expand Up @@ -255,7 +255,7 @@ WithCTE
+- SubqueryAlias __auto_generated_subquery_name
+- Project [c#x]
+- SubqueryAlias t
+- CTERelationRef xxxx, true, [c#x]
+- CTERelationRef xxxx, true, [c#x], false


-- !query
Expand Down Expand Up @@ -358,14 +358,14 @@ WithCTE
: +- SubqueryAlias t
: +- Project [1#x]
: +- SubqueryAlias t2
: +- CTERelationRef xxxx, true, [1#x]
: +- CTERelationRef xxxx, true, [1#x], false
:- CTERelationDef xxxx, false
: +- SubqueryAlias t2
: +- Project [2 AS 2#x]
: +- OneRowRelation
+- Project [1#x]
+- SubqueryAlias t
+- CTERelationRef xxxx, true, [1#x]
+- CTERelationRef xxxx, true, [1#x], false


-- !query
Expand Down Expand Up @@ -428,15 +428,15 @@ WithCTE
: +- SubqueryAlias t3
: +- Project [1#x]
: +- SubqueryAlias t1
: +- CTERelationRef xxxx, true, [1#x]
: +- CTERelationRef xxxx, true, [1#x], false
:- CTERelationDef xxxx, false
: +- SubqueryAlias t2
: +- Project [1#x]
: +- SubqueryAlias t3
: +- CTERelationRef xxxx, true, [1#x]
: +- CTERelationRef xxxx, true, [1#x], false
+- Project [1#x]
+- SubqueryAlias t2
+- CTERelationRef xxxx, true, [1#x]
+- CTERelationRef xxxx, true, [1#x], false


-- !query
Expand All @@ -459,12 +459,12 @@ WithCTE
: +- SubqueryAlias cte_inner
: +- Project [1#x]
: +- SubqueryAlias cte_outer
: +- CTERelationRef xxxx, true, [1#x]
: +- CTERelationRef xxxx, true, [1#x], false
+- Project [1#x]
+- SubqueryAlias __auto_generated_subquery_name
+- Project [1#x]
+- SubqueryAlias cte_inner
+- CTERelationRef xxxx, true, [1#x]
+- CTERelationRef xxxx, true, [1#x], false


-- !query
Expand Down Expand Up @@ -492,19 +492,19 @@ WithCTE
: +- SubqueryAlias cte_inner_inner
: +- Project [1#x]
: +- SubqueryAlias cte_outer
: +- CTERelationRef xxxx, true, [1#x]
: +- CTERelationRef xxxx, true, [1#x], false
:- CTERelationDef xxxx, false
: +- SubqueryAlias cte_inner
: +- Project [1#x]
: +- SubqueryAlias __auto_generated_subquery_name
: +- Project [1#x]
: +- SubqueryAlias cte_inner_inner
: +- CTERelationRef xxxx, true, [1#x]
: +- CTERelationRef xxxx, true, [1#x], false
+- Project [1#x]
+- SubqueryAlias __auto_generated_subquery_name
+- Project [1#x]
+- SubqueryAlias cte_inner
+- CTERelationRef xxxx, true, [1#x]
+- CTERelationRef xxxx, true, [1#x], false


-- !query
Expand Down
Loading

0 comments on commit 4304663

Please sign in to comment.