diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala index 75af56bdee828..4481063435847 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala @@ -215,7 +215,10 @@ object CharVarcharUtils extends Logging { Seq(Literal(f.name), processStringForCharVarchar( GetStructField(expr, i, Some(f.name)), f.dataType, charFuncName, varcharFuncName)) }) - if (expr.nullable) { + if (struct.valExprs.forall(_.isInstanceOf[GetStructField])) { + // No field needs char/varchar processing, just return the original expression. + expr + } else if (expr.nullable) { If(IsNull(expr), Literal(null, struct.dataType), struct) } else { struct @@ -225,11 +228,18 @@ object CharVarcharUtils extends Logging { processStringForCharVarcharInArray(expr, et, containsNull, charFuncName, varcharFuncName) case MapType(kt, vt, valueContainsNull) => + val keys = MapKeys(expr) val newKeys = processStringForCharVarcharInArray( - MapKeys(expr), kt, containsNull = false, charFuncName, varcharFuncName) + keys, kt, containsNull = false, charFuncName, varcharFuncName) + val values = MapValues(expr) val newValues = processStringForCharVarcharInArray( - MapValues(expr), vt, valueContainsNull, charFuncName, varcharFuncName) - MapFromArrays(newKeys, newValues) + values, vt, valueContainsNull, charFuncName, varcharFuncName) + if (newKeys.fastEquals(keys) && newValues.fastEquals(values)) { + // If map key/value does not need char/varchar processing, return the original expression. + expr + } else { + MapFromArrays(newKeys, newValues) + } case _ => expr } @@ -242,10 +252,13 @@ object CharVarcharUtils extends Logging { charFuncName: Option[String], varcharFuncName: Option[String]): Expression = { val param = NamedLambdaVariable("x", replaceCharVarcharWithString(et), containsNull) - val func = LambdaFunction( - processStringForCharVarchar(param, et, charFuncName, varcharFuncName), - Seq(param)) - ArrayTransform(arr, func) + val funcBody = processStringForCharVarchar(param, et, charFuncName, varcharFuncName) + if (funcBody.fastEquals(param)) { + // If array element does not need char/varchar processing, return the original expression. + arr + } else { + ArrayTransform(arr, LambdaFunction(funcBody, Seq(param))) + } } def addPaddingForScan(attr: Attribute): Expression = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala index 27a630c169be0..95c2e5085d92e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala @@ -18,7 +18,9 @@ package org.apache.spark.sql import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.SchemaRequiredDataSource import org.apache.spark.sql.connector.catalog.InMemoryPartitionTableCatalog @@ -862,6 +864,26 @@ class FileSourceCharVarcharTestSuite extends CharVarcharTestSuite with SharedSpa } } } + + test("SPARK-40697: read-side char padding should only be applied if necessary") { + withTable("t") { + sql( + s""" + |CREATE TABLE t ( + | c1 CHAR(5), + | c2 STRUCT, + | c3 ARRAY, + | c4 MAP + |) USING $format + |""".stripMargin) + spark.read.table("t").queryExecution.analyzed.foreach { + case Project(projectList, _) => + assert(projectList.length == 4) + assert(projectList.drop(1).forall(_.isInstanceOf[Attribute])) + case _ => + } + } + } } class DSV2CharVarcharTestSuite extends CharVarcharTestSuite