Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32136][SQL] NormalizeFloatingNumbers should work on null struct #28962

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.expressions.{Alias, And, ArrayTransform, CreateArray, CreateMap, CreateNamedStruct, CreateStruct, EqualTo, ExpectsInputTypes, Expression, GetStructField, KnownFloatingPointNormalized, LambdaFunction, NamedLambdaVariable, UnaryExpression}
import org.apache.spark.sql.catalyst.expressions.{Alias, And, ArrayTransform, CreateArray, CreateMap, CreateNamedStruct, CreateStruct, EqualTo, ExpectsInputTypes, Expression, GetStructField, If, IsNull, KnownFloatingPointNormalized, LambdaFunction, Literal, NamedLambdaVariable, UnaryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery, Window}
Expand Down Expand Up @@ -123,7 +123,8 @@ object NormalizeFloatingNumbers extends Rule[LogicalPlan] {
val fields = expr.dataType.asInstanceOf[StructType].fields.indices.map { i =>
normalize(GetStructField(expr, i))
}
CreateStruct(fields)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know why this works before?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CreateStruct re-creates the struct when NormalizeFloatingNumbers is run. The original expr is not used. But now we wrap expr in IsNull.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, got it!

This reminds me that we can optimize the code a bit: Instead of only matching CreateNamedStruct, we can have a whitelist to apply the normalization to the children. For example, If(..., CreateNamedStruct(...)), we should apply normalization to the inner CreateNamedStruct.

We can do this later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For If(..., CreateNamedStruct(...)), it is struct type so already covered by the StructType case. So the normalization is already applied to the inner CreateNamedStruct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea I know, but I mean we can do better. We can directly apply the normalization to the children in If(..., CreateNamedStruct(children)), instead of treating the outer If as a black box and apply normalization to it.

val struct = CreateStruct(fields)
If(IsNull(expr), Literal(null, struct.dataType), struct)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just wondering if we need a new Literal here. Maybe, can we simple just put expr?

- val struct = CreateStruct(fields)
- If(IsNull(expr), Literal(null, struct.dataType), struct)
+ If(IsNull(expr), expr, CreateStruct(fields))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The AS-IS also looks good to me, too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

literal is better here to simplify the expression tree, and probably has better perf.


case _ if expr.dataType.isInstanceOf[ArrayType] =>
val ArrayType(et, containsNull) = expr.dataType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1028,4 +1028,16 @@ class DataFrameAggregateSuite extends QueryTest
checkAnswer(df, Row("abellina", 2) :: Row("mithunr", 1) :: Nil)
}
}

test("SPARK-32136: NormalizeFloatingNumbers should work on null struct") {
val df = Seq(
A(None),
A(Some(B(None))),
A(Some(B(Some(1.0))))).toDF
val groupBy = df.groupBy("b").agg(count("*"))
checkAnswer(groupBy, Row(null, 1) :: Row(Row(null), 1) :: Row(Row(1.0), 1) :: Nil)
}
}

case class B(c: Option[Double])
case class A(b: Option[B])