diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index d442087721cbb..98fc9faca422c 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -93,6 +93,8 @@ displayTitle: Spark SQL Upgrading Guide - Since Spark 3.0, the `weekofyear`, `weekday` and `dayofweek` functions use java.time API for calculation week number of year and day number of week based on Proleptic Gregorian calendar. In Spark version 2.4 and earlier, the hybrid calendar (Julian + Gregorian) is used for the same purpose. Results of the functions returned by Spark 3.0 and previous versions can be different for dates before October 15, 1582 (Gregorian). + - Since Spark 3.0, the JDBC options `lowerBound` and `upperBound` are converted to TimestampType/DateType values in the same way as casting strings to TimestampType/DateType values. The conversion is based on Proleptic Gregorian calendar, and time zone defined by the SQL config `spark.sql.session.timeZone`. In Spark version 2.4 and earlier, the conversion is based on the hybrid calendar (Julian + Gregorian) and on default system time zone. + ## Upgrading From Spark SQL 2.3 to 2.4 - In Spark version 2.3 and earlier, the second parameter to array_contains function is implicitly promoted to the element type of first array type parameter. This type promotion can be lossy and may cause `array_contains` function to return wrong result. This problem has been addressed in 2.4 by employing a safer type promotion mechanism. This can cause some change in behavior and are illustrated in the table below. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index 13ed105004d70..c0f78b5273885 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.datasources.jdbc -import java.sql.{Date, Timestamp} - import scala.collection.mutable.ArrayBuffer import org.apache.spark.Partition @@ -27,10 +25,12 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter} +import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getTimeZone, stringToDate, stringToTimestamp} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.jdbc.JdbcDialects import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{DataType, DateType, NumericType, StructType, TimestampType} +import org.apache.spark.unsafe.types.UTF8String /** * Instructions on how to partition the table among workers. @@ -85,8 +85,8 @@ private[sql] object JDBCRelation extends Logging { val (column, columnType) = verifyAndGetNormalizedPartitionColumn( schema, partitionColumn.get, resolver, jdbcOptions) - val lowerBoundValue = toInternalBoundValue(lowerBound.get, columnType) - val upperBoundValue = toInternalBoundValue(upperBound.get, columnType) + val lowerBoundValue = toInternalBoundValue(lowerBound.get, columnType, timeZoneId) + val upperBoundValue = toInternalBoundValue(upperBound.get, columnType, timeZoneId) JDBCPartitioningInfo( column, columnType, lowerBoundValue, upperBoundValue, numPartitions.get) } @@ -174,10 +174,21 @@ private[sql] object JDBCRelation extends Logging { (dialect.quoteIdentifier(column.name), column.dataType) } - private def toInternalBoundValue(value: String, columnType: DataType): Long = columnType match { - case _: NumericType => value.toLong - case DateType => DateTimeUtils.fromJavaDate(Date.valueOf(value)).toLong - case TimestampType => DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(value)) + private def toInternalBoundValue( + value: String, + columnType: DataType, + timeZoneId: String): Long = { + def parse[T](f: UTF8String => Option[T]): T = { + f(UTF8String.fromString(value)).getOrElse { + throw new IllegalArgumentException( + s"Cannot parse the bound value $value as ${columnType.catalogString}") + } + } + columnType match { + case _: NumericType => value.toLong + case DateType => parse(stringToDate).toLong + case TimestampType => parse(stringToTimestamp(_, getTimeZone(timeZoneId))) + } } private def toBoundValueInWhereClause( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 284900b68ae52..a4dc537d31b7e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -27,7 +27,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeTestUtils} import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.datasources.LogicalRelation @@ -1523,4 +1523,36 @@ class JDBCSuite extends QueryTest assert(e.contains("The driver could not open a JDBC connection. " + "Check the URL: jdbc:mysql://localhost/db")) } + + test("support casting patterns for lower/upper bounds of TimestampType") { + DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) { + Seq( + ("1972-07-04 03:30:00", "1972-07-15 20:50:32.5", "1972-07-27 14:11:05"), + ("2019-01-20 12:00:00.502", "2019-01-20 12:00:00.751", "2019-01-20 12:00:01.000"), + ("2019-01-20T00:00:00.123456", "2019-01-20 00:05:00.123456", + "2019-01-20T00:10:00.123456"), + ("1500-01-20T00:00:00.123456", "1500-01-20 00:05:00.123456", "1500-01-20T00:10:00.123456") + ).foreach { case (lower, middle, upper) => + val df = spark.read.format("jdbc") + .option("url", urlWithUserAndPass) + .option("dbtable", "TEST.DATETIME") + .option("partitionColumn", "t") + .option("lowerBound", lower) + .option("upperBound", upper) + .option("numPartitions", 2) + .load() + + df.logicalPlan match { + case lr: LogicalRelation if lr.relation.isInstanceOf[JDBCRelation] => + val jdbcRelation = lr.relation.asInstanceOf[JDBCRelation] + val whereClauses = jdbcRelation.parts.map(_.asInstanceOf[JDBCPartition].whereClause) + assert(whereClauses.toSet === Set( + s""""T" < '$middle' or "T" is null""", + s""""T" >= '$middle'""")) + } + } + } + } + } }