Skip to content

Commit

Permalink
[SPARK-35531][SQL][3.1] Directly pass hive Table to HiveClient when c…
Browse files Browse the repository at this point in the history
…all getPartitions to avoid unnecessary convert from HiveTable -> CatalogTable -> HiveTable

### What changes were proposed in this pull request?
In current `HiveexternalCatalog.listpartitions`, it use
```
  final def getPartitions(
      db: String,
      table: String,
      partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = {
    getPartitions(getTable(db, table), partialSpec)
  }
```
It call `geTables` to get a raw HiveTable then convert it to a CatalogTable, in `getPartitions` it re-convert it to a HiveTable.
This cause a conflicts since in HiveTable we store schema as lowercase but for bucket cols and sort cols it didn't convert it to lowercase.

In this pr, we directly pass raw HiveTable to HiveClient's request to avoid unnecessary convert and potential conflicts, also  respect case sensitivity.

### Why are the changes needed?
When user create a hive bucket table with upper case schema, the table schema will be stored as lower cases while bucket column info will stay the same with user input.

if we try to insert into this table, an HiveException reports bucket column is not in table schema.

here is a simple repro
```
spark.sql("""
  CREATE TABLE TEST1(
    V1 BIGINT,
    S1 INT)
  PARTITIONED BY (PK BIGINT)
  CLUSTERED BY (V1)
  SORTED BY (S1)
  INTO 200 BUCKETS
  STORED AS PARQUET """).show

spark.sql("INSERT INTO TEST1 SELECT * FROM VALUES(1,1,1)").show
```
Error message:
```
scala> spark.sql("INSERT INTO TEST1 SELECT * FROM VALUES(1,1,1)").show
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: Bucket columns V1 is not part of the table columns ([FieldSchema(name:v1, type:bigint, comment:null), FieldSchema(name:s1, type:int, comment:null)]
  at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:112)
  at org.apache.spark.sql.hive.HiveExternalCatalog.listPartitions(HiveExternalCatalog.scala:1242)
  at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.listPartitions(ExternalCatalogWithListener.scala:254)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitions(SessionCatalog.scala:1166)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:103)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:120)
  at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:228)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:228)
  at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
  at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:615)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:610)
  ... 47 elided
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Bucket columns V1 is not part of the table columns ([FieldSchema(name:v1, type:bigint, comment:null), FieldSchema(name:s1, type:int, comment:null)]
  at org.apache.hadoop.hive.ql.metadata.Table.setBucketCols(Table.java:552)
  at org.apache.spark.sql.hive.client.HiveClientImpl$.toHiveTable(HiveClientImpl.scala:1082)
  at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$getPartitions$1(HiveClientImpl.scala:732)
  at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:291)
  at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:224)
  at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:223)
  at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:273)
  at org.apache.spark.sql.hive.client.HiveClientImpl.getPartitions(HiveClientImpl.scala:731)
  at org.apache.spark.sql.hive.client.HiveClient.getPartitions(HiveClient.scala:222)
  at org.apache.spark.sql.hive.client.HiveClient.getPartitions$(HiveClient.scala:218)
  at org.apache.spark.sql.hive.client.HiveClientImpl.getPartitions(HiveClientImpl.scala:91)
  at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$listPartitions$1(HiveExternalCatalog.scala:1245)
  at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:102)
  ... 69 more
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT

Closes #35475 from AngersZhuuuu/SPARK-35531-3.1.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
AngersZhuuuu authored and cloud-fan committed Feb 10, 2022
1 parent 851ebad commit 4098d26
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,20 +215,10 @@ private[hive] trait HiveClient {
* Returns the partitions for the given table that match the supplied partition spec.
* If no partition spec is specified, all partitions are returned.
*/
final def getPartitions(
def getPartitions(
db: String,
table: String,
partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = {
getPartitions(getTable(db, table), partialSpec)
}

/**
* Returns the partitions for the given table that match the supplied partition spec.
* If no partition spec is specified, all partitions are returned.
*/
def getPartitions(
catalogTable: CatalogTable,
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition]
partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition]

/** Returns partitions filtered by predicates for the given table. */
def getPartitionsByFilter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException, PartitionsAlreadyExistException}
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException, NoSuchTableException, PartitionsAlreadyExistException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Expression
Expand Down Expand Up @@ -722,14 +722,19 @@ private[hive] class HiveClientImpl(
Option(hivePartition).map(fromHivePartition)
}

/**
* Returns the partitions for the given table that match the supplied partition spec.
* If no partition spec is specified, all partitions are returned.
*/
override def getPartitions(
table: CatalogTable,
db: String,
table: String,
spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = {
val hiveTable = withHiveState {
getRawTableOption(db, table).getOrElse(throw new NoSuchTableException(db, table))
}
getPartitions(hiveTable, spec)
}

private def getPartitions(
hiveTable: HiveTable,
spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = withHiveState {
val hiveTable = toHiveTable(table, Some(userName))
val partSpec = spec match {
case None => CatalogTypes.emptyTablePartitionSpec
case Some(s) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hive

import java.io.File
import java.util.Locale

import com.google.common.io.Files
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -870,4 +871,54 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
assert(e.contains("Partition spec is invalid"))
}
}

test("SPARK-35531: Insert data with different cases of bucket column") {
withTable("test1") {
Seq(true, false).foreach { isHiveTable =>
val createSpark = if (isHiveTable) {
"""
|CREATE TABLE TEST1(
|v1 BIGINT,
|s1 INT)
|PARTITIONED BY (pk BIGINT)
|CLUSTERED BY (v1)
|SORTED BY (s1)
|INTO 200 BUCKETS
|STORED AS PARQUET
""".stripMargin
} else {
"""
|CREATE TABLE test1(
|v1 BIGINT,
|s1 INT)
|USING PARQUET
|PARTITIONED BY (pk BIGINT)
|CLUSTERED BY (v1)
|SORTED BY (s1)
|INTO 200 BUCKETS
""".stripMargin
}

val insertString =
"""
|INSERT INTO test1
|SELECT * FROM VALUES(1,1,1)
""".stripMargin

val dropString = "DROP TABLE IF EXISTS test1"

sql(dropString)
sql(createSpark.toLowerCase(Locale.ROOT))

sql(insertString.toLowerCase(Locale.ROOT))
sql(insertString.toUpperCase(Locale.ROOT))

sql(dropString)
sql(createSpark.toUpperCase(Locale.ROOT))

sql(insertString.toLowerCase(Locale.ROOT))
sql(insertString.toUpperCase(Locale.ROOT))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -480,9 +480,9 @@ class VersionsSuite extends SparkFunSuite with Logging {
assert(partitionNames == client.getPartitionNames(client.getTable("default", "src_part")))
}

test(s"$version: getPartitions(catalogTable)") {
test(s"$version: getPartitions(db, table, spec)") {
assert(testPartitionCount ==
client.getPartitions(client.getTable("default", "src_part")).size)
client.getPartitions("default", "src_part", None).size)
}

test(s"$version: getPartitionsByFilter") {
Expand Down

0 comments on commit 4098d26

Please sign in to comment.