Skip to content

Commit

Permalink
SPARK-50840. Fixing the timestamp parsing failure for Hive table meta…
Browse files Browse the repository at this point in the history
…data reload, when timestamp alias points to NTZ
  • Loading branch information
ashahid committed Jan 16, 2025
1 parent bd56285 commit 383ce07
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils}
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{PartitioningUtils, SourceOptions}
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl}
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils
Expand Down Expand Up @@ -122,7 +122,36 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
* before returning it.
*/
private[hive] def getRawTable(db: String, table: String): CatalogTable = {
client.getTable(db, table)

def convertNTZToLTZ(st: StructType): StructType = {
val rectifiedFields = st.map(sf => sf.dataType match {
case TimestampNTZType
if sf.metadata.contains(HiveClientImpl.HIVE_DATA_TYPE_KEY) &&
HiveClientImpl.HIVE_DATA_TYPE_TIME_STAMP == sf.metadata.getString(
HiveClientImpl.HIVE_DATA_TYPE_KEY) =>
val newMd = new MetadataBuilder()
.withMetadata(sf.metadata)
.remove(HiveClientImpl.HIVE_DATA_TYPE_KEY)
.build()
sf.copy(dataType = TimestampType, metadata = newMd)

case structType: StructType => sf.copy(dataType = convertNTZToLTZ(structType))

case _ => sf
})
StructType(rectifiedFields)
}

val ct = client.getTable(db, table)
// since we are in HiveExternalCatalog, and Hive does not support TimeStampTypeNTZ, so even
// if this spark engine , converted the timestamp type coming from Hive Metastore to NTZ, we
// need to convert it back to TimeStampType
if (SQLConf.get.timestampType == TimestampNTZType) {
val rectifiedSchema = convertNTZToLTZ(ct.schema)
ct.copy(schema = StructType(rectifiedSchema))
} else {
ct
}
}

private[hive] def getRawTablesByNames(db: String, tables: Seq[String]): Seq[CatalogTable] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, N
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.catalyst.parser.{AstBuilder, CatalystSqlParser, ParseException}
import org.apache.spark.sql.catalyst.parser.ParserUtils.withOrigin
import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{ComplexColTypeContext}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.SupportsNamespaces._
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
Expand Down Expand Up @@ -1090,6 +1092,9 @@ private[hive] class HiveClientImpl(
}

private[hive] object HiveClientImpl extends Logging {
val HIVE_DATA_TYPE_KEY = "hiveDataType"
val HIVE_DATA_TYPE_TIME_STAMP = "timestamp"

/** Converts the native StructField to Hive's FieldSchema. */
def toHiveColumn(c: StructField): FieldSchema = {
// For Hive Serde, we still need to to restore the raw type for char and varchar type.
Expand Down Expand Up @@ -1117,7 +1122,7 @@ private[hive] object HiveClientImpl extends Logging {
// map<string,struct<x:int,y.z:int>> -> map<string,struct<`x`:int,`y.z`:int>>
val typeStr = hc.getType.replaceAll("(?<=struct<|,)([^,<:]+)(?=:)", "`$1`")
try {
CatalystSqlParser.parseDataType(typeStr)
HiveCatalystSqLParser.parseDataType(typeStr)
} catch {
case e: ParseException =>
throw QueryExecutionErrors.cannotRecognizeHiveTypeError(e, typeStr, hc.getName)
Expand All @@ -1127,13 +1132,31 @@ private[hive] object HiveClientImpl extends Logging {
/** Builds the native StructField from Hive's FieldSchema. */
def fromHiveColumn(hc: FieldSchema): StructField = {
val columnType = getSparkSQLDataType(hc)
// since we loading metadata of an existing table, and if Hive's timestamp column type
// is Timestamp (which in existing Hive translates to Spark's timestamp_ltz), so even if
// spark's timestamp alias points to NTZ, we need to override it to spark's Timestamp type(which
// stands for timestamp_ltz). Once spark to hive mapping is actually rectified, this fix needs
// to be revisited
val metadata = getMetadataForNtz(columnType, hc.getType)
val field = StructField(
name = hc.getName,
dataType = columnType,
nullable = true)
nullable = true,
metadata = metadata)
Option(hc.getComment).map(field.withComment).getOrElse(field)
}

def getMetadataForNtz(
colType: DataType,
hiveType: String,
existingMd: Metadata = Metadata.empty ): Metadata =
colType match {
case TimestampNTZType if hiveType == HiveClientImpl.HIVE_DATA_TYPE_TIME_STAMP =>
new MetadataBuilder().withMetadata(existingMd).putString(HIVE_DATA_TYPE_KEY, hiveType).build()

case _ => existingMd
}

private def verifyColumnDataType(schema: StructType): Unit = {
schema.foreach(col => getSparkSQLDataType(toHiveColumn(col)))
}
Expand Down Expand Up @@ -1421,3 +1444,13 @@ private[hive] object HiveClientImpl extends Logging {
}
}
}

object HiveCatalystSqLParser extends CatalystSqlParser {
override val astBuilder = new AstBuilder {
override def visitComplexColType(ctx: ComplexColTypeContext): StructField = withOrigin(ctx) {
val sf = super.visitComplexColType(ctx)
val md = HiveClientImpl.getMetadataForNtz(sf.dataType, ctx.dataType().getText, sf.metadata)
sf.copy(metadata = md)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METAS
import org.apache.spark.sql.hive.orc.OrcFileOperator
import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton, TestHiveSparkSession}
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.internal.SQLConf.ORC_IMPLEMENTATION
import org.apache.spark.sql.internal.SQLConf.{ORC_IMPLEMENTATION, TimestampTypes}
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -3383,4 +3383,33 @@ class HiveDDLSuite
checkAnswer(sql("SELECT * FROM t1"), Row(0))
}
}

test("SPARK-50840: Hive table created with timestamp LTZ, should retain the same on reload") {
withTable("t1", "t2") {
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> TimestampTypes.TIMESTAMP_LTZ.toString) {
val tblDef =
s"""
|CREATE TABLE t1 (
| ts timestamp,
| nstd Struct<name: String, ts1 timestamp>
|)
|using parquet""".stripMargin
sql(tblDef)
}
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> TimestampTypes.TIMESTAMP_NTZ.toString) {
def assertNoTimestampNTZ(structType: StructType): Unit = {
structType.foreach {
_.dataType match {
case TimestampNTZType => fail("TimestampNTZType not expected")
case st: StructType => assertNoTimestampNTZ(st)
case _ =>
}
}
}

sql("alter table t1 rename to t2")
assertNoTimestampNTZ(spark.table("t2").schema)
}
}
}
}

0 comments on commit 383ce07

Please sign in to comment.