Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26653][SQL] Use Proleptic Gregorian calendar in parsing JDBC lower/upper bounds #23597

Closed
wants to merge 11 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.execution.datasources.jdbc

import java.sql.{Date, Timestamp}

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.Partition
Expand All @@ -27,10 +25,12 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode, SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getTimeZone, stringToDate, stringToTimestamp}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, DateType, NumericType, StructType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String

/**
* Instructions on how to partition the table among workers.
Expand Down Expand Up @@ -174,10 +174,19 @@ private[sql] object JDBCRelation extends Logging {
(dialect.quoteIdentifier(column.name), column.dataType)
}

private def toInternalBoundValue(value: String, columnType: DataType): Long = columnType match {
case _: NumericType => value.toLong
case DateType => DateTimeUtils.fromJavaDate(Date.valueOf(value)).toLong
case TimestampType => DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(value))
private def toInternalBoundValue(value: String, columnType: DataType): Long = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should pass in the timezone id, just like what we did for toBoundValueInWhereClause

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

def parse[T](f: UTF8String => Option[T]): T = {
f(UTF8String.fromString(value)).getOrElse {
throw new IllegalArgumentException(
s"Cannot parse the bound value $value as ${columnType.catalogString}")
}
}
columnType match {
case _: NumericType => value.toLong
case DateType => parse(stringToDate).toLong
case TimestampType =>
parse(stringToTimestamp(_, getTimeZone(SQLConf.get.sessionLocalTimeZone)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We decide to adjust the timestamp constants based on the user-specified local timezone?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we call Timestamp.valueOf(value), which uses JVM local timezone. It seems to me that using Spark session timezone is better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we have to do it. This is a followup of #23391 , which changed how we turn the timestamp boundaries to string. Here we change hoow we turn string to timestamp.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a behavior change. We need to clearly document which inputs start respecting our Spark local session timezone?

Copy link
Member

@HyukjinKwon HyukjinKwon Jan 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we mention something like .. all string -> timestamp will respect Session timezone, JDBC lower/upper bounds, blabla, ..., and java 8 time will be consistently used across code base .. after the sub-tasks in the umbrella are resolved?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we should improve the migration guide. Switching to Proleptic Gregorian calendar is a behavior change to many places, it's better we can list all of them in the migration guide.

}
}

private def toBoundValueInWhereClause(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeTestUtils}
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
Expand Down Expand Up @@ -1523,4 +1523,37 @@ class JDBCSuite extends QueryTest
assert(e.contains("The driver could not open a JDBC connection. " +
"Check the URL: jdbc:mysql://localhost/db"))
}

test("parsing timestamp bounds") {
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
Seq(
("1972-07-04 03:30:00", "1972-07-15 20:50:32.5", "1972-07-27 14:11:05"),
("2019-01-20 12:00:00.502", "2019-01-20 12:00:00.751", "2019-01-20 12:00:01.000"),
(
"2019-01-20T00:00:00.123456",
"2019-01-20 00:05:00.123456",
"2019-01-20T00:10:00.123456"),
("1500-01-20T00:00:00.123456", "1500-01-20 00:05:00.123456", "1500-01-20T00:10:00.123456")
).foreach { case (lower, middle, upper) =>
val df = spark.read.format("jdbc")
.option("url", urlWithUserAndPass)
.option("dbtable", "TEST.DATETIME")
.option("partitionColumn", "t")
.option("lowerBound", lower)
.option("upperBound", upper)
.option("numPartitions", 2)
.load()

df.logicalPlan match {
case LogicalRelation(JDBCRelation(_, parts, _), _, _, _) =>
val whereClauses = parts.map(_.asInstanceOf[JDBCPartition].whereClause).toSet
assert(whereClauses === Set(
s""""T" < '$middle' or "T" is null""",
s""""T" >= '$middle'"""))
}
}
}
}
}
}