Skip to content

Commit

Permalink
[SPARK-29295][SQL][FOLLOWUP] Dynamic partition map parsed from partit…
Browse files Browse the repository at this point in the history
…ion path should be case insensitive

### What changes were proposed in this pull request?

This is a follow up of #25979.
When we inserting overwrite  an external hive partitioned table with upper case dynamic partition key, exception thrown.

like:
```
org.apache.spark.SparkException: Dynamic partition key P1 is not among written partition paths.
```
The root cause is that Hive metastore is not case preserving and keeps partition columns with lower cased names, see details in:

https://github.com/apache/spark/blob/ddd8d5f5a0b6db17babc201ba4b73f7df91df1a3/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala#L895-L901
https://github.com/apache/spark/blob/e28914095aa1fa7a4680b5e4fcf69e3ef64b3dbc/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala#L228-L234

In this PR, we convert the dynamic partition map to a case insensitive map.
### Why are the changes needed?

To fix the issue when inserting overwrite into external hive partitioned table with upper case dynamic partition key.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
UT.

Closes #28765 from turboFei/SPARK-29295-follow-up.

Authored-by: turbofei <fwang12@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
turboFei authored and cloud-fan committed Jun 9, 2020
1 parent de91915 commit 717ec5e
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.hive.HiveExternalCatalog
Expand Down Expand Up @@ -225,9 +226,12 @@ case class InsertIntoHiveTable(
ExternalCatalogUtils.unescapePathName(splitPart(1))
}.toMap

val caseInsensitiveDpMap = CaseInsensitiveMap(dpMap)

val updatedPartitionSpec = partition.map {
case (key, Some(value)) => key -> value
case (key, None) if dpMap.contains(key) => key -> dpMap(key)
case (key, None) if caseInsensitiveDpMap.contains(key) =>
key -> caseInsensitiveDpMap(key)
case (key, _) =>
throw new SparkException(s"Dynamic partition key $key is not among " +
"written partition paths.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2544,6 +2544,19 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi
assert(e.getMessage.contains("Cannot modify the value of a static config"))
}
}

test("SPARK-29295: dynamic partition map parsed from partition path should be case insensitive") {
withTable("t") {
withSQLConf("hive.exec.dynamic.partition" -> "true",
"hive.exec.dynamic.partition.mode" -> "nonstrict") {
withTempDir { loc =>
sql(s"CREATE TABLE t(c1 INT) PARTITIONED BY(P1 STRING) LOCATION '${loc.getAbsolutePath}'")
sql("INSERT OVERWRITE TABLE t PARTITION(P1) VALUES(1, 'caseSensitive')")
checkAnswer(sql("select * from t"), Row(1, "caseSensitive"))
}
}
}
}
}

class SQLQuerySuite extends SQLQuerySuiteBase with DisableAdaptiveExecutionSuite
Expand Down

0 comments on commit 717ec5e

Please sign in to comment.