-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-49261][SQL] Don't replace literals in aggregate expressions with group-by expressions #47876
Conversation
Note this does result in a slight (but AFAIK harmless) oddity in the plan when the grouping expressions are included in the select statement, e.g.:
The aggregate operator will look like this:
In the list of grouping expressions, |
Expand
output attributes that stand in for group-by expressions4e951ce
to
fab9a41
Compare
withTempView("v1") { | ||
data.createOrReplaceTempView("v1") | ||
val df = | ||
sql("""select |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. Could you capitalize SQL syntax, @bersprockets ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM.
cc @cloud-fan , @yaooqinn , @itholic (as a release manager of Apache Spark 3.5.3)
fab9a41
to
be0a975
Compare
By the way, this is not a recent regression. I can reproduce on 3.2.2. It's possible it's always been this way. |
…th group-by expressions ### What changes were proposed in this pull request? Before this PR, `RewriteDistinctAggregates` could potentially replace literals in the aggregate expressions with output attributes from the `Expand` operator. This can occur when a group-by expression is a literal that happens by chance to match a literal used in an aggregate expression. E.g.: ``` create or replace temp view v1(a, b, c) as values (1, 1.001d, 2), (2, 3.001d, 4), (2, 3.001, 4); cache table v1; select round(sum(b), 6) as sum1, count(distinct a) as count1, count(distinct c) as count2 from ( select 6 as gb, * from v1 ) group by a, gb; ``` In the optimized plan, you can see that the literal 6 in the `round` function invocation has been patched with an output attribute (6#163) from the `Expand` operator: ``` == Optimized Logical Plan == 'Aggregate [a#123, 6#163], [round(first(sum(__auto_generated_subquery_name.b)#167, true) FILTER (WHERE (gid#162 = 0)), 6#163) AS sum1#114, count(__auto_generated_subquery_name.a#164) FILTER (WHERE (gid#162 = 1)) AS count1#115L, count(__auto_generated_subquery_name.c#165) FILTER (WHERE (gid#162 = 2)) AS count2#116L] +- Aggregate [a#123, 6#163, __auto_generated_subquery_name.a#164, __auto_generated_subquery_name.c#165, gid#162], [a#123, 6#163, __auto_generated_subquery_name.a#164, __auto_generated_subquery_name.c#165, gid#162, sum(__auto_generated_subquery_name.b#166) AS sum(__auto_generated_subquery_name.b)#167] +- Expand [[a#123, 6, null, null, 0, b#124], [a#123, 6, a#123, null, 1, null], [a#123, 6, null, c#125, 2, null]], [a#123, 6#163, __auto_generated_subquery_name.a#164, __auto_generated_subquery_name.c#165, gid#162, __auto_generated_subquery_name.b#166] +- InMemoryRelation [a#123, b#124, c#125], StorageLevel(disk, memory, deserialized, 1 replicas) +- LocalTableScan [a#6, b#7, c#8] ``` This is because the literal 6 was used in the group-by expressions (referred to as gb in the query, and renamed 6#163 in the `Expand` operator's output attributes). After this PR, foldable expressions in the aggregate expressions are kept as-is. ### Why are the changes needed? Some expressions require a foldable argument. In the above example, the `round` function requires a foldable expression as the scale argument. Because the scale argument is patched with an attribute, `RoundBase#checkInputDataTypes` returns an error, which leaves the `Aggregate` operator unresolved: ``` [INTERNAL_ERROR] Invalid call to dataType on unresolved object SQLSTATE: XX000 org.apache.spark.sql.catalyst.analysis.UnresolvedException: [INTERNAL_ERROR] Invalid call to dataType on unresolved object SQLSTATE: XX000 at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:255) at org.apache.spark.sql.catalyst.types.DataTypeUtils$.$anonfun$fromAttributes$1(DataTypeUtils.scala:241) at scala.collection.immutable.List.map(List.scala:247) at scala.collection.immutable.List.map(List.scala:79) at org.apache.spark.sql.catalyst.types.DataTypeUtils$.fromAttributes(DataTypeUtils.scala:241) at org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:428) at org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:428) at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:474) ... ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47876 from bersprockets/group_by_lit_issue. Authored-by: Bruce Robbins <bersprockets@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 1a0791d) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Merged to master/3.5/3.4. |
…th group-by expressions ### What changes were proposed in this pull request? Before this PR, `RewriteDistinctAggregates` could potentially replace literals in the aggregate expressions with output attributes from the `Expand` operator. This can occur when a group-by expression is a literal that happens by chance to match a literal used in an aggregate expression. E.g.: ``` create or replace temp view v1(a, b, c) as values (1, 1.001d, 2), (2, 3.001d, 4), (2, 3.001, 4); cache table v1; select round(sum(b), 6) as sum1, count(distinct a) as count1, count(distinct c) as count2 from ( select 6 as gb, * from v1 ) group by a, gb; ``` In the optimized plan, you can see that the literal 6 in the `round` function invocation has been patched with an output attribute (6#163) from the `Expand` operator: ``` == Optimized Logical Plan == 'Aggregate [a#123, 6#163], [round(first(sum(__auto_generated_subquery_name.b)#167, true) FILTER (WHERE (gid#162 = 0)), 6#163) AS sum1#114, count(__auto_generated_subquery_name.a#164) FILTER (WHERE (gid#162 = 1)) AS count1#115L, count(__auto_generated_subquery_name.c#165) FILTER (WHERE (gid#162 = 2)) AS count2#116L] +- Aggregate [a#123, 6#163, __auto_generated_subquery_name.a#164, __auto_generated_subquery_name.c#165, gid#162], [a#123, 6#163, __auto_generated_subquery_name.a#164, __auto_generated_subquery_name.c#165, gid#162, sum(__auto_generated_subquery_name.b#166) AS sum(__auto_generated_subquery_name.b)#167] +- Expand [[a#123, 6, null, null, 0, b#124], [a#123, 6, a#123, null, 1, null], [a#123, 6, null, c#125, 2, null]], [a#123, 6#163, __auto_generated_subquery_name.a#164, __auto_generated_subquery_name.c#165, gid#162, __auto_generated_subquery_name.b#166] +- InMemoryRelation [a#123, b#124, c#125], StorageLevel(disk, memory, deserialized, 1 replicas) +- LocalTableScan [a#6, b#7, c#8] ``` This is because the literal 6 was used in the group-by expressions (referred to as gb in the query, and renamed 6#163 in the `Expand` operator's output attributes). After this PR, foldable expressions in the aggregate expressions are kept as-is. ### Why are the changes needed? Some expressions require a foldable argument. In the above example, the `round` function requires a foldable expression as the scale argument. Because the scale argument is patched with an attribute, `RoundBase#checkInputDataTypes` returns an error, which leaves the `Aggregate` operator unresolved: ``` [INTERNAL_ERROR] Invalid call to dataType on unresolved object SQLSTATE: XX000 org.apache.spark.sql.catalyst.analysis.UnresolvedException: [INTERNAL_ERROR] Invalid call to dataType on unresolved object SQLSTATE: XX000 at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:255) at org.apache.spark.sql.catalyst.types.DataTypeUtils$.$anonfun$fromAttributes$1(DataTypeUtils.scala:241) at scala.collection.immutable.List.map(List.scala:247) at scala.collection.immutable.List.map(List.scala:79) at org.apache.spark.sql.catalyst.types.DataTypeUtils$.fromAttributes(DataTypeUtils.scala:241) at org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:428) at org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:428) at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:474) ... ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47876 from bersprockets/group_by_lit_issue. Authored-by: Bruce Robbins <bersprockets@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 1a0791d) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
late LGTM |
1 similar comment
late LGTM |
…th group-by expressions ### What changes were proposed in this pull request? Before this PR, `RewriteDistinctAggregates` could potentially replace literals in the aggregate expressions with output attributes from the `Expand` operator. This can occur when a group-by expression is a literal that happens by chance to match a literal used in an aggregate expression. E.g.: ``` create or replace temp view v1(a, b, c) as values (1, 1.001d, 2), (2, 3.001d, 4), (2, 3.001, 4); cache table v1; select round(sum(b), 6) as sum1, count(distinct a) as count1, count(distinct c) as count2 from ( select 6 as gb, * from v1 ) group by a, gb; ``` In the optimized plan, you can see that the literal 6 in the `round` function invocation has been patched with an output attribute (6#163) from the `Expand` operator: ``` == Optimized Logical Plan == 'Aggregate [a#123, 6#163], [round(first(sum(__auto_generated_subquery_name.b)apache#167, true) FILTER (WHERE (gid#162 = 0)), 6#163) AS sum1#114, count(__auto_generated_subquery_name.a#164) FILTER (WHERE (gid#162 = 1)) AS count1#115L, count(__auto_generated_subquery_name.c#165) FILTER (WHERE (gid#162 = 2)) AS count2#116L] +- Aggregate [a#123, 6#163, __auto_generated_subquery_name.a#164, __auto_generated_subquery_name.c#165, gid#162], [a#123, 6#163, __auto_generated_subquery_name.a#164, __auto_generated_subquery_name.c#165, gid#162, sum(__auto_generated_subquery_name.b#166) AS sum(__auto_generated_subquery_name.b)apache#167] +- Expand [[a#123, 6, null, null, 0, b#124], [a#123, 6, a#123, null, 1, null], [a#123, 6, null, c#125, 2, null]], [a#123, 6#163, __auto_generated_subquery_name.a#164, __auto_generated_subquery_name.c#165, gid#162, __auto_generated_subquery_name.b#166] +- InMemoryRelation [a#123, b#124, c#125], StorageLevel(disk, memory, deserialized, 1 replicas) +- LocalTableScan [a#6, b#7, c#8] ``` This is because the literal 6 was used in the group-by expressions (referred to as gb in the query, and renamed 6#163 in the `Expand` operator's output attributes). After this PR, foldable expressions in the aggregate expressions are kept as-is. ### Why are the changes needed? Some expressions require a foldable argument. In the above example, the `round` function requires a foldable expression as the scale argument. Because the scale argument is patched with an attribute, `RoundBase#checkInputDataTypes` returns an error, which leaves the `Aggregate` operator unresolved: ``` [INTERNAL_ERROR] Invalid call to dataType on unresolved object SQLSTATE: XX000 org.apache.spark.sql.catalyst.analysis.UnresolvedException: [INTERNAL_ERROR] Invalid call to dataType on unresolved object SQLSTATE: XX000 at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:255) at org.apache.spark.sql.catalyst.types.DataTypeUtils$.$anonfun$fromAttributes$1(DataTypeUtils.scala:241) at scala.collection.immutable.List.map(List.scala:247) at scala.collection.immutable.List.map(List.scala:79) at org.apache.spark.sql.catalyst.types.DataTypeUtils$.fromAttributes(DataTypeUtils.scala:241) at org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:428) at org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:428) at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:474) ... ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47876 from bersprockets/group_by_lit_issue. Authored-by: Bruce Robbins <bersprockets@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 1a0791d) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit ba05a6b)
…th group-by expressions ### What changes were proposed in this pull request? Before this PR, `RewriteDistinctAggregates` could potentially replace literals in the aggregate expressions with output attributes from the `Expand` operator. This can occur when a group-by expression is a literal that happens by chance to match a literal used in an aggregate expression. E.g.: ``` create or replace temp view v1(a, b, c) as values (1, 1.001d, 2), (2, 3.001d, 4), (2, 3.001, 4); cache table v1; select round(sum(b), 6) as sum1, count(distinct a) as count1, count(distinct c) as count2 from ( select 6 as gb, * from v1 ) group by a, gb; ``` In the optimized plan, you can see that the literal 6 in the `round` function invocation has been patched with an output attribute (6#163) from the `Expand` operator: ``` == Optimized Logical Plan == 'Aggregate [a#123, 6#163], [round(first(sum(__auto_generated_subquery_name.b)apache#167, true) FILTER (WHERE (gid#162 = 0)), 6#163) AS sum1#114, count(__auto_generated_subquery_name.a#164) FILTER (WHERE (gid#162 = 1)) AS count1#115L, count(__auto_generated_subquery_name.c#165) FILTER (WHERE (gid#162 = 2)) AS count2#116L] +- Aggregate [a#123, 6#163, __auto_generated_subquery_name.a#164, __auto_generated_subquery_name.c#165, gid#162], [a#123, 6#163, __auto_generated_subquery_name.a#164, __auto_generated_subquery_name.c#165, gid#162, sum(__auto_generated_subquery_name.b#166) AS sum(__auto_generated_subquery_name.b)apache#167] +- Expand [[a#123, 6, null, null, 0, b#124], [a#123, 6, a#123, null, 1, null], [a#123, 6, null, c#125, 2, null]], [a#123, 6#163, __auto_generated_subquery_name.a#164, __auto_generated_subquery_name.c#165, gid#162, __auto_generated_subquery_name.b#166] +- InMemoryRelation [a#123, b#124, c#125], StorageLevel(disk, memory, deserialized, 1 replicas) +- LocalTableScan [a#6, b#7, c#8] ``` This is because the literal 6 was used in the group-by expressions (referred to as gb in the query, and renamed 6#163 in the `Expand` operator's output attributes). After this PR, foldable expressions in the aggregate expressions are kept as-is. ### Why are the changes needed? Some expressions require a foldable argument. In the above example, the `round` function requires a foldable expression as the scale argument. Because the scale argument is patched with an attribute, `RoundBase#checkInputDataTypes` returns an error, which leaves the `Aggregate` operator unresolved: ``` [INTERNAL_ERROR] Invalid call to dataType on unresolved object SQLSTATE: XX000 org.apache.spark.sql.catalyst.analysis.UnresolvedException: [INTERNAL_ERROR] Invalid call to dataType on unresolved object SQLSTATE: XX000 at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:255) at org.apache.spark.sql.catalyst.types.DataTypeUtils$.$anonfun$fromAttributes$1(DataTypeUtils.scala:241) at scala.collection.immutable.List.map(List.scala:247) at scala.collection.immutable.List.map(List.scala:79) at org.apache.spark.sql.catalyst.types.DataTypeUtils$.fromAttributes(DataTypeUtils.scala:241) at org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:428) at org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:428) at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:474) ... ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47876 from bersprockets/group_by_lit_issue. Authored-by: Bruce Robbins <bersprockets@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…th group-by expressions ### What changes were proposed in this pull request? Before this PR, `RewriteDistinctAggregates` could potentially replace literals in the aggregate expressions with output attributes from the `Expand` operator. This can occur when a group-by expression is a literal that happens by chance to match a literal used in an aggregate expression. E.g.: ``` create or replace temp view v1(a, b, c) as values (1, 1.001d, 2), (2, 3.001d, 4), (2, 3.001, 4); cache table v1; select round(sum(b), 6) as sum1, count(distinct a) as count1, count(distinct c) as count2 from ( select 6 as gb, * from v1 ) group by a, gb; ``` In the optimized plan, you can see that the literal 6 in the `round` function invocation has been patched with an output attribute (6#163) from the `Expand` operator: ``` == Optimized Logical Plan == 'Aggregate [a#123, 6#163], [round(first(sum(__auto_generated_subquery_name.b)apache#167, true) FILTER (WHERE (gid#162 = 0)), 6#163) AS sum1#114, count(__auto_generated_subquery_name.a#164) FILTER (WHERE (gid#162 = 1)) AS count1#115L, count(__auto_generated_subquery_name.c#165) FILTER (WHERE (gid#162 = 2)) AS count2#116L] +- Aggregate [a#123, 6#163, __auto_generated_subquery_name.a#164, __auto_generated_subquery_name.c#165, gid#162], [a#123, 6#163, __auto_generated_subquery_name.a#164, __auto_generated_subquery_name.c#165, gid#162, sum(__auto_generated_subquery_name.b#166) AS sum(__auto_generated_subquery_name.b)apache#167] +- Expand [[a#123, 6, null, null, 0, b#124], [a#123, 6, a#123, null, 1, null], [a#123, 6, null, c#125, 2, null]], [a#123, 6#163, __auto_generated_subquery_name.a#164, __auto_generated_subquery_name.c#165, gid#162, __auto_generated_subquery_name.b#166] +- InMemoryRelation [a#123, b#124, c#125], StorageLevel(disk, memory, deserialized, 1 replicas) +- LocalTableScan [a#6, b#7, c#8] ``` This is because the literal 6 was used in the group-by expressions (referred to as gb in the query, and renamed 6#163 in the `Expand` operator's output attributes). After this PR, foldable expressions in the aggregate expressions are kept as-is. ### Why are the changes needed? Some expressions require a foldable argument. In the above example, the `round` function requires a foldable expression as the scale argument. Because the scale argument is patched with an attribute, `RoundBase#checkInputDataTypes` returns an error, which leaves the `Aggregate` operator unresolved: ``` [INTERNAL_ERROR] Invalid call to dataType on unresolved object SQLSTATE: XX000 org.apache.spark.sql.catalyst.analysis.UnresolvedException: [INTERNAL_ERROR] Invalid call to dataType on unresolved object SQLSTATE: XX000 at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:255) at org.apache.spark.sql.catalyst.types.DataTypeUtils$.$anonfun$fromAttributes$1(DataTypeUtils.scala:241) at scala.collection.immutable.List.map(List.scala:247) at scala.collection.immutable.List.map(List.scala:79) at org.apache.spark.sql.catalyst.types.DataTypeUtils$.fromAttributes(DataTypeUtils.scala:241) at org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:428) at org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:428) at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:474) ... ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47876 from bersprockets/group_by_lit_issue. Authored-by: Bruce Robbins <bersprockets@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
What changes were proposed in this pull request?
Before this PR,
RewriteDistinctAggregates
could potentially replace literals in the aggregate expressions with output attributes from theExpand
operator. This can occur when a group-by expression is a literal that happens by chance to match a literal used in an aggregate expression. E.g.:In the optimized plan, you can see that the literal 6 in the
round
function invocation has been patched with an output attribute (6#163) from theExpand
operator:This is because the literal 6 was used in the group-by expressions (referred to as gb in the query, and renamed 6#163 in the
Expand
operator's output attributes).After this PR, foldable expressions in the aggregate expressions are kept as-is.
Why are the changes needed?
Some expressions require a foldable argument. In the above example, the
round
function requires a foldable expression as the scale argument. Because the scale argument is patched with an attribute,RoundBase#checkInputDataTypes
returns an error, which leaves theAggregate
operator unresolved:Does this PR introduce any user-facing change?
No.
How was this patch tested?
New tests.
Was this patch authored or co-authored using generative AI tooling?
No.