Skip to content

Commit b80e8cb

Browse files
bersprocketsdongjoon-hyun
authored andcommitted
[SPARK-46779][SQL] InMemoryRelation instances of the same cached plan should be semantically equivalent
### What changes were proposed in this pull request? When canonicalizing `output` in `InMemoryRelation`, use `output` itself as the schema for determining the ordinals, rather than `cachedPlan.output`. ### Why are the changes needed? `InMemoryRelation.output` and `InMemoryRelation.cachedPlan.output` don't necessarily use the same exprIds. E.g.: ``` +- InMemoryRelation [c1#340, c2#341], StorageLevel(disk, memory, deserialized, 1 replicas) +- LocalTableScan [c1#254, c2#255] ``` Because of this, `InMemoryRelation` will sometimes fail to fully canonicalize, resulting in cases where two semantically equivalent `InMemoryRelation` instances appear to be semantically nonequivalent. Example: ``` create or replace temp view data(c1, c2) as values (1, 2), (1, 3), (3, 7), (4, 5); cache table data; select c1, (select count(*) from data d1 where d1.c1 = d2.c1), count(c2) from data d2 group by all; ``` If plan change validation checking is on (i.e., `spark.sql.planChangeValidation=true`), the failure is: ``` [PLAN_VALIDATION_FAILED_RULE_EXECUTOR] The input plan of org.apache.spark.sql.internal.BaseSessionStateBuilder$$anon$2 is invalid: Aggregate: Aggregate [c1#78, scalar-subquery#77 [c1#78]], [c1#78, scalar-subquery#77 [c1#78] AS scalarsubquery(c1)#90L, count(c2#79) AS count(c2)#83L] ... is not a valid aggregate expression: [SCALAR_SUBQUERY_IS_IN_GROUP_BY_OR_AGGREGATE_FUNCTION] The correlated scalar subquery '"scalarsubquery(c1)"' is neither present in GROUP BY, nor in an aggregate function. ``` If plan change validation checking is off, the failure is more mysterious: ``` [INTERNAL_ERROR] Couldn't find count(1)#163L in [c1#78,_groupingexpression#149L,count(1)#82L] SQLSTATE: XX000 org.apache.spark.SparkException: [INTERNAL_ERROR] Couldn't find count(1)#163L in [c1#78,_groupingexpression#149L,count(1)#82L] SQLSTATE: XX000 ``` If you remove the cache command, the query succeeds. The above failures happen because the subquery in the aggregate expressions and the subquery in the grouping expressions seem semantically nonequivalent since the `InMemoryRelation` in one of the subquery plans failed to completely canonicalize. In `CacheManager#useCachedData`, two lookups for the same cached plan may create `InMemoryRelation` instances that have different exprIds in `output`. That's because the plan fragments used as lookup keys may have been deduplicated by `DeduplicateRelations`, and thus have different exprIds in their respective output schemas. When `CacheManager#useCachedData` creates an `InMemoryRelation` instance, it borrows the output schema of the plan fragment used as the lookup key. The failure to fully canonicalize has other effects. For example, this query fails to reuse the exchange: ``` create or replace temp view data(c1, c2) as values (1, 2), (1, 3), (2, 4), (3, 7), (7, 22); cache table data; set spark.sql.autoBroadcastJoinThreshold=-1; set spark.sql.adaptive.enabled=false; select * from data l join data r on l.c1 = r.c1; ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#44806 from bersprockets/plan_validation_issue. Authored-by: Bruce Robbins <bersprockets@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
1 parent 7706202 commit b80e8cb

File tree

3 files changed

+23
-1
lines changed

3 files changed

+23
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,7 @@ case class InMemoryRelation(
416416
}
417417

418418
override def doCanonicalize(): logical.LogicalPlan =
419-
copy(output = output.map(QueryPlan.normalizeExpressions(_, cachedPlan.output)),
419+
copy(output = output.map(QueryPlan.normalizeExpressions(_, output)),
420420
cacheBuilder,
421421
outputOrdering)
422422

sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala

+15
Original file line numberDiff line numberDiff line change
@@ -2157,6 +2157,21 @@ class DataFrameAggregateSuite extends QueryTest
21572157

21582158
def createAggregate(df: DataFrame): DataFrame = df.groupBy("c0").agg(count("*"))
21592159
}
2160+
2161+
test("SPARK-46779: Group by subquery with a cached relation") {
2162+
withTempView("data") {
2163+
sql(
2164+
"""create or replace temp view data(c1, c2) as values
2165+
|(1, 2),
2166+
|(1, 3),
2167+
|(3, 7)""".stripMargin)
2168+
sql("cache table data")
2169+
val df = sql(
2170+
"""select c1, (select count(*) from data d1 where d1.c1 = d2.c1), count(c2)
2171+
|from data d2 group by all""".stripMargin)
2172+
checkAnswer(df, Row(1, 2, 2) :: Row(3, 1, 1) :: Nil)
2173+
}
2174+
}
21602175
}
21612176

21622177
case class B(c: Option[Double])

sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryRelationSuite.scala

+7
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,11 @@ class InMemoryRelationSuite extends SparkFunSuite with SharedSparkSessionBase {
3434
assert(!relationCachedPlan.eq(clonedCachedPlan))
3535
assert(relationCachedPlan === clonedCachedPlan)
3636
}
37+
38+
test("SPARK-46779: InMemoryRelations with the same cached plan are semantically equivalent") {
39+
val d = spark.range(1)
40+
val r1 = InMemoryRelation(StorageLevel.MEMORY_ONLY, d.queryExecution, None)
41+
val r2 = r1.withOutput(r1.output.map(_.newInstance()))
42+
assert(r1.sameResult(r2))
43+
}
3744
}

0 commit comments

Comments
 (0)