From 4098d26084e9c743445bc7db1320d6e1e62c8bd5 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Thu, 10 Feb 2022 21:55:58 +0800 Subject: [PATCH] [SPARK-35531][SQL][3.1] Directly pass hive Table to HiveClient when call getPartitions to avoid unnecessary convert from HiveTable -> CatalogTable -> HiveTable ### What changes were proposed in this pull request? In current `HiveexternalCatalog.listpartitions`, it use ``` final def getPartitions( db: String, table: String, partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = { getPartitions(getTable(db, table), partialSpec) } ``` It call `geTables` to get a raw HiveTable then convert it to a CatalogTable, in `getPartitions` it re-convert it to a HiveTable. This cause a conflicts since in HiveTable we store schema as lowercase but for bucket cols and sort cols it didn't convert it to lowercase. In this pr, we directly pass raw HiveTable to HiveClient's request to avoid unnecessary convert and potential conflicts, also respect case sensitivity. ### Why are the changes needed? When user create a hive bucket table with upper case schema, the table schema will be stored as lower cases while bucket column info will stay the same with user input. if we try to insert into this table, an HiveException reports bucket column is not in table schema. here is a simple repro ``` spark.sql(""" CREATE TABLE TEST1( V1 BIGINT, S1 INT) PARTITIONED BY (PK BIGINT) CLUSTERED BY (V1) SORTED BY (S1) INTO 200 BUCKETS STORED AS PARQUET """).show spark.sql("INSERT INTO TEST1 SELECT * FROM VALUES(1,1,1)").show ``` Error message: ``` scala> spark.sql("INSERT INTO TEST1 SELECT * FROM VALUES(1,1,1)").show org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: Bucket columns V1 is not part of the table columns ([FieldSchema(name:v1, type:bigint, comment:null), FieldSchema(name:s1, type:int, comment:null)] at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:112) at org.apache.spark.sql.hive.HiveExternalCatalog.listPartitions(HiveExternalCatalog.scala:1242) at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.listPartitions(ExternalCatalogWithListener.scala:254) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitions(SessionCatalog.scala:1166) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:103) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106) at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:120) at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:228) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685) at org.apache.spark.sql.Dataset.(Dataset.scala:228) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:615) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:610) ... 47 elided Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Bucket columns V1 is not part of the table columns ([FieldSchema(name:v1, type:bigint, comment:null), FieldSchema(name:s1, type:int, comment:null)] at org.apache.hadoop.hive.ql.metadata.Table.setBucketCols(Table.java:552) at org.apache.spark.sql.hive.client.HiveClientImpl$.toHiveTable(HiveClientImpl.scala:1082) at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$getPartitions$1(HiveClientImpl.scala:732) at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:291) at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:224) at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:223) at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:273) at org.apache.spark.sql.hive.client.HiveClientImpl.getPartitions(HiveClientImpl.scala:731) at org.apache.spark.sql.hive.client.HiveClient.getPartitions(HiveClient.scala:222) at org.apache.spark.sql.hive.client.HiveClient.getPartitions$(HiveClient.scala:218) at org.apache.spark.sql.hive.client.HiveClientImpl.getPartitions(HiveClientImpl.scala:91) at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$listPartitions$1(HiveExternalCatalog.scala:1245) at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:102) ... 69 more ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #35475 from AngersZhuuuu/SPARK-35531-3.1. Authored-by: Angerszhuuuu Signed-off-by: Wenchen Fan --- .../spark/sql/hive/client/HiveClient.scala | 14 +---- .../sql/hive/client/HiveClientImpl.scala | 19 ++++--- .../apache/spark/sql/hive/InsertSuite.scala | 51 +++++++++++++++++++ .../spark/sql/hive/client/VersionsSuite.scala | 4 +- 4 files changed, 67 insertions(+), 21 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index 48f3837740933..2295497b016f9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -215,20 +215,10 @@ private[hive] trait HiveClient { * Returns the partitions for the given table that match the supplied partition spec. * If no partition spec is specified, all partitions are returned. */ - final def getPartitions( + def getPartitions( db: String, table: String, - partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = { - getPartitions(getTable(db, table), partialSpec) - } - - /** - * Returns the partitions for the given table that match the supplied partition spec. - * If no partition spec is specified, all partitions are returned. - */ - def getPartitions( - catalogTable: CatalogTable, - partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] + partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] /** Returns partitions filtered by predicates for the given table. */ def getPartitionsByFilter( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 0d45af245a519..55f7adcc3f2bf 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -49,7 +49,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException, PartitionsAlreadyExistException} +import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException, NoSuchTableException, PartitionsAlreadyExistException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Expression @@ -722,14 +722,19 @@ private[hive] class HiveClientImpl( Option(hivePartition).map(fromHivePartition) } - /** - * Returns the partitions for the given table that match the supplied partition spec. - * If no partition spec is specified, all partitions are returned. - */ override def getPartitions( - table: CatalogTable, + db: String, + table: String, + spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = { + val hiveTable = withHiveState { + getRawTableOption(db, table).getOrElse(throw new NoSuchTableException(db, table)) + } + getPartitions(hiveTable, spec) + } + + private def getPartitions( + hiveTable: HiveTable, spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = withHiveState { - val hiveTable = toHiveTable(table, Some(userName)) val partSpec = spec match { case None => CatalogTypes.emptyTablePartitionSpec case Some(s) => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala index 71750e6b3a516..8152d36c5cff5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive import java.io.File +import java.util.Locale import com.google.common.io.Files import org.apache.hadoop.fs.Path @@ -870,4 +871,54 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter assert(e.contains("Partition spec is invalid")) } } + + test("SPARK-35531: Insert data with different cases of bucket column") { + withTable("test1") { + Seq(true, false).foreach { isHiveTable => + val createSpark = if (isHiveTable) { + """ + |CREATE TABLE TEST1( + |v1 BIGINT, + |s1 INT) + |PARTITIONED BY (pk BIGINT) + |CLUSTERED BY (v1) + |SORTED BY (s1) + |INTO 200 BUCKETS + |STORED AS PARQUET + """.stripMargin + } else { + """ + |CREATE TABLE test1( + |v1 BIGINT, + |s1 INT) + |USING PARQUET + |PARTITIONED BY (pk BIGINT) + |CLUSTERED BY (v1) + |SORTED BY (s1) + |INTO 200 BUCKETS + """.stripMargin + } + + val insertString = + """ + |INSERT INTO test1 + |SELECT * FROM VALUES(1,1,1) + """.stripMargin + + val dropString = "DROP TABLE IF EXISTS test1" + + sql(dropString) + sql(createSpark.toLowerCase(Locale.ROOT)) + + sql(insertString.toLowerCase(Locale.ROOT)) + sql(insertString.toUpperCase(Locale.ROOT)) + + sql(dropString) + sql(createSpark.toUpperCase(Locale.ROOT)) + + sql(insertString.toLowerCase(Locale.ROOT)) + sql(insertString.toUpperCase(Locale.ROOT)) + } + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index b5500eaf47158..2dc29eb746fe4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -480,9 +480,9 @@ class VersionsSuite extends SparkFunSuite with Logging { assert(partitionNames == client.getPartitionNames(client.getTable("default", "src_part"))) } - test(s"$version: getPartitions(catalogTable)") { + test(s"$version: getPartitions(db, table, spec)") { assert(testPartitionCount == - client.getPartitions(client.getTable("default", "src_part")).size) + client.getPartitions("default", "src_part", None).size) } test(s"$version: getPartitionsByFilter") {