Skip to content

Commit

Permalink
[SPARK-47871][SQL] Oracle: Map TimestampType to TIMESTAMP WITH LOCAL …
Browse files Browse the repository at this point in the history
…TIME ZONE

### What changes were proposed in this pull request?

This PR map TimestampType to TIMESTAMP WITH LOCAL TIME ZONE

### Why are the changes needed?

We currently map both TimestampType and TimestampNTZType to Oracle's TIMESTAMP which represents a timestamp without time zone. This is ambiguous

### Does this PR introduce _any_ user-facing change?

It does not affect spark users to play a TimestampType read-write-read roundtrip, but might affect other systems' reading

### How was this patch tested?

existing test with new configuration
```java
SPARK-42627: Support ORACLE TIMESTAMP WITH LOCAL TIME ZONE (9 seconds, 536 milliseconds)
```

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#46080 from yaooqinn/SPARK-47871.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
yaooqinn authored and dongjoon-hyun committed Apr 16, 2024
1 parent ee2673f commit 9a1fc11
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -547,23 +547,28 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSpark
}

test("SPARK-42627: Support ORACLE TIMESTAMP WITH LOCAL TIME ZONE") {
val reader = spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "test_ltz")
val df = reader.load()
val row1 = df.collect().head.getTimestamp(0)
assert(df.count() === 1)
assert(row1 === Timestamp.valueOf("2018-11-17 13:33:33"))

df.write.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "test_ltz")
.mode("append")
.save()

val df2 = reader.load()
assert(df.count() === 2)
assert(df2.collect().forall(_.getTimestamp(0) === row1))
Seq("true", "false").foreach { flag =>
withSQLConf((SQLConf.LEGACY_ORACLE_TIMESTAMP_MAPPING_ENABLED.key, flag)) {
val df = spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "test_ltz")
.load()
val row1 = df.collect().head.getTimestamp(0)
assert(df.count() === 1)
assert(row1 === Timestamp.valueOf("2018-11-17 13:33:33"))

df.write.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "test_ltz" + flag)
.save()

val df2 = spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "test_ltz" + flag)
.load()
checkAnswer(df2, Row(row1))
}
}
}

test("SPARK-47761: Reading ANSI INTERVAL Types") {
Expand Down
1 change: 1 addition & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ license: |
- Since Spark 4.0, MySQL JDBC datasource will read FLOAT as FloatType, while in Spark 3.5 and previous, it was read as DoubleType. To restore the previous behavior, you can cast the column to the old type.
- Since Spark 4.0, MySQL JDBC datasource will read BIT(n > 1) as BinaryType, while in Spark 3.5 and previous, read as LongType. To restore the previous behavior, set `spark.sql.legacy.mysql.bitArrayMapping.enabled` to `true`.
- Since Spark 4.0, MySQL JDBC datasource will write ShortType as SMALLINT, while in Spark 3.5 and previous, write as INTEGER. To restore the previous behavior, you can replace the column with IntegerType whenever before writing.
- Since Spark 4.0, Oracle JDBC datasource will write TimestampType as TIMESTAMP WITH LOCAL TIME ZONE, while in Spark 3.5 and previous, write as TIMESTAMP. To restore the previous behavior, set `spark.sql.legacy.oracle.timestampMapping.enabled` to `true`.
- Since Spark 4.0, The default value for `spark.sql.legacy.ctePrecedencePolicy` has been changed from `EXCEPTION` to `CORRECTED`. Instead of raising an error, inner CTE definitions take precedence over outer definitions.
- Since Spark 4.0, The default value for `spark.sql.legacy.timeParserPolicy` has been changed from `EXCEPTION` to `CORRECTED`. Instead of raising an `INCONSISTENT_BEHAVIOR_CROSS_VERSION` error, `CANNOT_PARSE_TIMESTAMP` will be raised if ANSI mode is enable. `NULL` will be returned if ANSI mode is disabled. See [Datetime Patterns for Formatting and Parsing](sql-ref-datetime-pattern.html).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4135,6 +4135,15 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val LEGACY_ORACLE_TIMESTAMP_MAPPING_ENABLED =
buildConf("spark.sql.legacy.oracle.timestampMapping.enabled")
.internal()
.doc("When true, TimestampType maps to TIMESTAMP in Oracle; otherwise, " +
"TIMESTAMP WITH LOCAL TIME ZONE.")
.version("4.0.0")
.booleanConf
.createWithDefault(false)

val CSV_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.csv.filterPushdown.enabled")
.doc("When true, enable filter pushdown to CSV datasource.")
.version("3.0.0")
Expand Down Expand Up @@ -5235,6 +5244,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
def legacyMySqlBitArrayMappingEnabled: Boolean =
getConf(LEGACY_MYSQL_BIT_ARRAY_MAPPING_ENABLED)

def legacyOracleTimestampMappingEnabled: Boolean =
getConf(LEGACY_ORACLE_TIMESTAMP_MAPPING_ENABLED)

override def legacyTimeParserPolicy: LegacyBehaviorPolicy.Value = {
LegacyBehaviorPolicy.withName(getConf(SQLConf.LEGACY_TIME_PARSER_POLICY))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ import java.util.Locale
import scala.util.control.NonFatal

import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.connector.expressions.Expression
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.jdbc.OracleDialect._
import org.apache.spark.sql.types._


private case class OracleDialect() extends JdbcDialect {
private case class OracleDialect() extends JdbcDialect with SQLConfHelper {
override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:oracle")

Expand Down Expand Up @@ -120,6 +121,8 @@ private case class OracleDialect() extends JdbcDialect {
case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.SMALLINT))
case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.SMALLINT))
case StringType => Some(JdbcType("VARCHAR2(255)", java.sql.Types.VARCHAR))
case TimestampType if !conf.legacyOracleTimestampMappingEnabled =>
Some(JdbcType("TIMESTAMP WITH LOCAL TIME ZONE", TIMESTAMP_LTZ))
case _ => None
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1335,7 +1335,10 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
assert(getJdbcType(oracleDialect, StringType) == "VARCHAR2(255)")
assert(getJdbcType(oracleDialect, BinaryType) == "BLOB")
assert(getJdbcType(oracleDialect, DateType) == "DATE")
assert(getJdbcType(oracleDialect, TimestampType) == "TIMESTAMP")
assert(getJdbcType(oracleDialect, TimestampType) == "TIMESTAMP WITH LOCAL TIME ZONE")
withSQLConf(SQLConf.LEGACY_ORACLE_TIMESTAMP_MAPPING_ENABLED.key -> "true") {
assert(getJdbcType(oracleDialect, TimestampType) == "TIMESTAMP")
}
assert(getJdbcType(oracleDialect, TimestampNTZType) == "TIMESTAMP")
}

Expand Down

0 comments on commit 9a1fc11

Please sign in to comment.