Skip to content

Commit

Permalink
[SPARK-40697][SQL][FOLLOWUP] Read-side char padding should only be ap…
Browse files Browse the repository at this point in the history
…plied if necessary

### What changes were proposed in this pull request?

This is a followup of #38151, to fix a perf issue. When struct/array/map doesn't contain char type field, we should not recreate the struct/array/map for nothing.

### Why are the changes needed?

fix a perf issue

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new test

Closes #38479 from cloud-fan/char.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
  • Loading branch information
cloud-fan authored and gengliangwang committed Nov 4, 2022
1 parent a2a8de9 commit 5196ff5
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,10 @@ object CharVarcharUtils extends Logging {
Seq(Literal(f.name), processStringForCharVarchar(
GetStructField(expr, i, Some(f.name)), f.dataType, charFuncName, varcharFuncName))
})
if (expr.nullable) {
if (struct.valExprs.forall(_.isInstanceOf[GetStructField])) {
// No field needs char/varchar processing, just return the original expression.
expr
} else if (expr.nullable) {
If(IsNull(expr), Literal(null, struct.dataType), struct)
} else {
struct
Expand All @@ -225,11 +228,18 @@ object CharVarcharUtils extends Logging {
processStringForCharVarcharInArray(expr, et, containsNull, charFuncName, varcharFuncName)

case MapType(kt, vt, valueContainsNull) =>
val keys = MapKeys(expr)
val newKeys = processStringForCharVarcharInArray(
MapKeys(expr), kt, containsNull = false, charFuncName, varcharFuncName)
keys, kt, containsNull = false, charFuncName, varcharFuncName)
val values = MapValues(expr)
val newValues = processStringForCharVarcharInArray(
MapValues(expr), vt, valueContainsNull, charFuncName, varcharFuncName)
MapFromArrays(newKeys, newValues)
values, vt, valueContainsNull, charFuncName, varcharFuncName)
if (newKeys.fastEquals(keys) && newValues.fastEquals(values)) {
// If map key/value does not need char/varchar processing, return the original expression.
expr
} else {
MapFromArrays(newKeys, newValues)
}

case _ => expr
}
Expand All @@ -242,10 +252,13 @@ object CharVarcharUtils extends Logging {
charFuncName: Option[String],
varcharFuncName: Option[String]): Expression = {
val param = NamedLambdaVariable("x", replaceCharVarcharWithString(et), containsNull)
val func = LambdaFunction(
processStringForCharVarchar(param, et, charFuncName, varcharFuncName),
Seq(param))
ArrayTransform(arr, func)
val funcBody = processStringForCharVarchar(param, et, charFuncName, varcharFuncName)
if (funcBody.fastEquals(param)) {
// If array element does not need char/varchar processing, return the original expression.
arr
} else {
ArrayTransform(arr, LambdaFunction(funcBody, Seq(param)))
}
}

def addPaddingForScan(attr: Attribute): Expression = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.spark.sql

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.SchemaRequiredDataSource
import org.apache.spark.sql.connector.catalog.InMemoryPartitionTableCatalog
Expand Down Expand Up @@ -862,6 +864,26 @@ class FileSourceCharVarcharTestSuite extends CharVarcharTestSuite with SharedSpa
}
}
}

test("SPARK-40697: read-side char padding should only be applied if necessary") {
withTable("t") {
sql(
s"""
|CREATE TABLE t (
| c1 CHAR(5),
| c2 STRUCT<i VARCHAR(5)>,
| c3 ARRAY<VARCHAR(5)>,
| c4 MAP<INT, VARCHAR(5)>
|) USING $format
|""".stripMargin)
spark.read.table("t").queryExecution.analyzed.foreach {
case Project(projectList, _) =>
assert(projectList.length == 4)
assert(projectList.drop(1).forall(_.isInstanceOf[Attribute]))
case _ =>
}
}
}
}

class DSV2CharVarcharTestSuite extends CharVarcharTestSuite
Expand Down

0 comments on commit 5196ff5

Please sign in to comment.