From 51a6ba0181a013f2b62b47184785a8b6f6a78f12 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Fri, 11 Jan 2019 08:53:12 -0600 Subject: [PATCH] [SPARK-26503][CORE] Get rid of spark.sql.legacy.timeParser.enabled ## What changes were proposed in this pull request? Per discussion in #23391 (comment) this proposes to just remove the old pre-Spark-3 time parsing behavior. This is a rebase of https://github.com/apache/spark/pull/23411 ## How was this patch tested? Existing tests. Closes #23495 from srowen/SPARK-26503.2. Authored-by: Sean Owen Signed-off-by: Sean Owen --- .../sql/catalyst/util/DateFormatter.scala | 43 +--- .../catalyst/util/TimestampFormatter.scala | 37 +--- .../apache/spark/sql/internal/SQLConf.scala | 9 - .../catalyst/json/JsonInferSchemaSuite.scala | 77 +++----- .../datasources/json/JsonSuite.scala | 183 ++++-------------- .../sql/sources/HadoopFsRelationTest.scala | 107 +++++----- 6 files changed, 120 insertions(+), 336 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala index db92552814883..c47b08729c4e1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala @@ -20,12 +20,6 @@ package org.apache.spark.sql.catalyst.util import java.time.{Instant, ZoneId} import java.util.Locale -import scala.util.Try - -import org.apache.commons.lang3.time.FastDateFormat - -import org.apache.spark.sql.internal.SQLConf - sealed trait DateFormatter extends Serializable { def parse(s: String): Int // returns days since epoch def format(days: Int): String @@ -56,43 +50,8 @@ class Iso8601DateFormatter( } } -class LegacyDateFormatter(pattern: String, locale: Locale) extends DateFormatter { - @transient - private lazy val format = FastDateFormat.getInstance(pattern, locale) - - override def parse(s: String): Int = { - val milliseconds = format.parse(s).getTime - DateTimeUtils.millisToDays(milliseconds) - } - - override def format(days: Int): String = { - val date = DateTimeUtils.toJavaDate(days) - format.format(date) - } -} - -class LegacyFallbackDateFormatter( - pattern: String, - locale: Locale) extends LegacyDateFormatter(pattern, locale) { - override def parse(s: String): Int = { - Try(super.parse(s)).orElse { - // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility. - Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(s).getTime)) - }.getOrElse { - // In Spark 1.5.0, we store the data as number of days since epoch in string. - // So, we just convert it to Int. - s.toInt - } - } -} - object DateFormatter { def apply(format: String, locale: Locale): DateFormatter = { - if (SQLConf.get.legacyTimeParserEnabled) { - new LegacyFallbackDateFormatter(format, locale) - } else { - new Iso8601DateFormatter(format, locale) - } + new Iso8601DateFormatter(format, locale) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index 8042099e5a92e..10c73b2f99558 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -23,12 +23,6 @@ import java.time.format.DateTimeParseException import java.time.temporal.TemporalQueries import java.util.{Locale, TimeZone} -import scala.util.Try - -import org.apache.commons.lang3.time.FastDateFormat - -import org.apache.spark.sql.internal.SQLConf - sealed trait TimestampFormatter extends Serializable { /** * Parses a timestamp in a string and converts it to microseconds. @@ -79,37 +73,8 @@ class Iso8601TimestampFormatter( } } -class LegacyTimestampFormatter( - pattern: String, - timeZone: TimeZone, - locale: Locale) extends TimestampFormatter { - @transient - private lazy val format = FastDateFormat.getInstance(pattern, timeZone, locale) - - protected def toMillis(s: String): Long = format.parse(s).getTime - - override def parse(s: String): Long = toMillis(s) * DateTimeUtils.MICROS_PER_MILLIS - - override def format(us: Long): String = { - format.format(DateTimeUtils.toJavaTimestamp(us)) - } -} - -class LegacyFallbackTimestampFormatter( - pattern: String, - timeZone: TimeZone, - locale: Locale) extends LegacyTimestampFormatter(pattern, timeZone, locale) { - override def toMillis(s: String): Long = { - Try {super.toMillis(s)}.getOrElse(DateTimeUtils.stringToTime(s).getTime) - } -} - object TimestampFormatter { def apply(format: String, timeZone: TimeZone, locale: Locale): TimestampFormatter = { - if (SQLConf.get.legacyTimeParserEnabled) { - new LegacyFallbackTimestampFormatter(format, timeZone, locale) - } else { - new Iso8601TimestampFormatter(format, timeZone, locale) - } + new Iso8601TimestampFormatter(format, timeZone, locale) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 9804af7dff179..c1b885a72ad3e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1625,13 +1625,6 @@ object SQLConf { "a SparkConf entry.") .booleanConf .createWithDefault(true) - - val LEGACY_TIME_PARSER_ENABLED = buildConf("spark.sql.legacy.timeParser.enabled") - .doc("When set to true, java.text.SimpleDateFormat is used for formatting and parsing " + - " dates/timestamps in a locale-sensitive manner. When set to false, classes from " + - "java.time.* packages are used for the same purpose.") - .booleanConf - .createWithDefault(false) } /** @@ -2057,8 +2050,6 @@ class SQLConf extends Serializable with Logging { def setCommandRejectsSparkCoreConfs: Boolean = getConf(SQLConf.SET_COMMAND_REJECTS_SPARK_CORE_CONFS) - def legacyTimeParserEnabled: Boolean = getConf(SQLConf.LEGACY_TIME_PARSER_ENABLED) - /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala index 9a6f4f5f9b0cb..8ce45f06ba65d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala @@ -21,7 +21,6 @@ import com.fasterxml.jackson.core.JsonFactory import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.plans.SQLHelper -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ class JsonInferSchemaSuite extends SparkFunSuite with SQLHelper { @@ -43,61 +42,45 @@ class JsonInferSchemaSuite extends SparkFunSuite with SQLHelper { } test("inferring timestamp type") { - Seq(true, false).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { - checkTimestampType("yyyy", """{"a": "2018"}""") - checkTimestampType("yyyy=MM", """{"a": "2018=12"}""") - checkTimestampType("yyyy MM dd", """{"a": "2018 12 02"}""") - checkTimestampType( - "yyyy-MM-dd'T'HH:mm:ss.SSS", - """{"a": "2018-12-02T21:04:00.123"}""") - checkTimestampType( - "yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX", - """{"a": "2018-12-02T21:04:00.123567+01:00"}""") - } - } + checkTimestampType("yyyy", """{"a": "2018"}""") + checkTimestampType("yyyy=MM", """{"a": "2018=12"}""") + checkTimestampType("yyyy MM dd", """{"a": "2018 12 02"}""") + checkTimestampType( + "yyyy-MM-dd'T'HH:mm:ss.SSS", + """{"a": "2018-12-02T21:04:00.123"}""") + checkTimestampType( + "yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX", + """{"a": "2018-12-02T21:04:00.123567+01:00"}""") } test("prefer decimals over timestamps") { - Seq(true, false).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { - checkType( - options = Map( - "prefersDecimal" -> "true", - "timestampFormat" -> "yyyyMMdd.HHmmssSSS" - ), - json = """{"a": "20181202.210400123"}""", - dt = DecimalType(17, 9) - ) - } - } + checkType( + options = Map( + "prefersDecimal" -> "true", + "timestampFormat" -> "yyyyMMdd.HHmmssSSS" + ), + json = """{"a": "20181202.210400123"}""", + dt = DecimalType(17, 9) + ) } test("skip decimal type inferring") { - Seq(true, false).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { - checkType( - options = Map( - "prefersDecimal" -> "false", - "timestampFormat" -> "yyyyMMdd.HHmmssSSS" - ), - json = """{"a": "20181202.210400123"}""", - dt = TimestampType - ) - } - } + checkType( + options = Map( + "prefersDecimal" -> "false", + "timestampFormat" -> "yyyyMMdd.HHmmssSSS" + ), + json = """{"a": "20181202.210400123"}""", + dt = TimestampType + ) } test("fallback to string type") { - Seq(true, false).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { - checkType( - options = Map("timestampFormat" -> "yyyy,MM,dd.HHmmssSSS"), - json = """{"a": "20181202.210400123"}""", - dt = StringType - ) - } - } + checkType( + options = Map("timestampFormat" -> "yyyy,MM,dd.HHmmssSSS"), + json = """{"a": "20181202.210400123"}""", + dt = StringType + ) } test("disable timestamp inferring") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 8f575a371c98e..78debb5731116 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1451,109 +1451,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { }) } - test("backward compatibility") { - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> "true") { - // This test we make sure our JSON support can read JSON data generated by previous version - // of Spark generated through toJSON method and JSON data source. - // The data is generated by the following program. - // Here are a few notes: - // - Spark 1.5.0 cannot save timestamp data. So, we manually added timestamp field (col13) - // in the JSON object. - // - For Spark before 1.5.1, we do not generate UDTs. So, we manually added the UDT value to - // JSON objects generated by those Spark versions (col17). - // - If the type is NullType, we do not write data out. - - // Create the schema. - val struct = - StructType( - StructField("f1", FloatType, true) :: - StructField("f2", ArrayType(BooleanType), true) :: Nil) - - val dataTypes = - Seq( - StringType, BinaryType, NullType, BooleanType, - ByteType, ShortType, IntegerType, LongType, - FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5), - DateType, TimestampType, - ArrayType(IntegerType), MapType(StringType, LongType), struct, - new TestUDT.MyDenseVectorUDT()) - val fields = dataTypes.zipWithIndex.map { case (dataType, index) => - StructField(s"col$index", dataType, nullable = true) - } - val schema = StructType(fields) - - val constantValues = - Seq( - "a string in binary".getBytes(StandardCharsets.UTF_8), - null, - true, - 1.toByte, - 2.toShort, - 3, - Long.MaxValue, - 0.25.toFloat, - 0.75, - new java.math.BigDecimal(s"1234.23456"), - new java.math.BigDecimal(s"1.23456"), - java.sql.Date.valueOf("2015-01-01"), - java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123"), - Seq(2, 3, 4), - Map("a string" -> 2000L), - Row(4.75.toFloat, Seq(false, true)), - new TestUDT.MyDenseVector(Array(0.25, 2.25, 4.25))) - val data = - Row.fromSeq(Seq("Spark " + spark.sparkContext.version) ++ constantValues) :: Nil - - // Data generated by previous versions. - // scalastyle:off - val existingJSONData = - """{"col0":"Spark 1.2.2","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: - """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: - """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: - """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: - """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: - """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: - """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"16436","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: Nil - // scalastyle:on - - // Generate data for the current version. - val df = spark.createDataFrame(spark.sparkContext.parallelize(data, 1), schema) - withTempPath { path => - df.write.format("json").mode("overwrite").save(path.getCanonicalPath) - - // df.toJSON will convert internal rows to external rows first and then generate - // JSON objects. While, df.write.format("json") will write internal rows directly. - val allJSON = - existingJSONData ++ - df.toJSON.collect() ++ - sparkContext.textFile(path.getCanonicalPath).collect() - - Utils.deleteRecursively(path) - sparkContext.parallelize(allJSON, 1).saveAsTextFile(path.getCanonicalPath) - - // Read data back with the schema specified. - val col0Values = - Seq( - "Spark 1.2.2", - "Spark 1.3.1", - "Spark 1.3.1", - "Spark 1.4.1", - "Spark 1.4.1", - "Spark 1.5.0", - "Spark 1.5.0", - "Spark " + spark.sparkContext.version, - "Spark " + spark.sparkContext.version) - val expectedResult = col0Values.map { v => - Row.fromSeq(Seq(v) ++ constantValues) - } - checkAnswer( - spark.read.format("json").schema(schema).load(path.getCanonicalPath), - expectedResult - ) - } - } - } - test("SPARK-11544 test pathfilter") { withTempPath { dir => val path = dir.getCanonicalPath @@ -2592,53 +2489,45 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("inferring timestamp type") { - Seq(true, false).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { - def schemaOf(jsons: String*): StructType = spark.read.json(jsons.toDS).schema - - assert(schemaOf( - """{"a":"2018-12-17T10:11:12.123-01:00"}""", - """{"a":"2018-12-16T22:23:24.123-02:00"}""") === fromDDL("a timestamp")) - - assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":1}""") - === fromDDL("a string")) - assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":"123"}""") - === fromDDL("a string")) - - assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":null}""") - === fromDDL("a timestamp")) - assert(schemaOf("""{"a":null}""", """{"a":"2018-12-17T10:11:12.123-01:00"}""") - === fromDDL("a timestamp")) - } - } + def schemaOf(jsons: String*): StructType = spark.read.json(jsons.toDS).schema + + assert(schemaOf( + """{"a":"2018-12-17T10:11:12.123-01:00"}""", + """{"a":"2018-12-16T22:23:24.123-02:00"}""") === fromDDL("a timestamp")) + + assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":1}""") + === fromDDL("a string")) + assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":"123"}""") + === fromDDL("a string")) + + assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":null}""") + === fromDDL("a timestamp")) + assert(schemaOf("""{"a":null}""", """{"a":"2018-12-17T10:11:12.123-01:00"}""") + === fromDDL("a timestamp")) } test("roundtrip for timestamp type inferring") { - Seq(true, false).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { - val customSchema = new StructType().add("date", TimestampType) - withTempDir { dir => - val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.json" - val timestampsWithFormat = spark.read - .option("timestampFormat", "dd/MM/yyyy HH:mm") - .json(datesRecords) - assert(timestampsWithFormat.schema === customSchema) - - timestampsWithFormat.write - .format("json") - .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") - .option(DateTimeUtils.TIMEZONE_OPTION, "UTC") - .save(timestampsWithFormatPath) - - val readBack = spark.read - .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") - .option(DateTimeUtils.TIMEZONE_OPTION, "UTC") - .json(timestampsWithFormatPath) - - assert(readBack.schema === customSchema) - checkAnswer(readBack, timestampsWithFormat) - } - } + val customSchema = new StructType().add("date", TimestampType) + withTempDir { dir => + val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.json" + val timestampsWithFormat = spark.read + .option("timestampFormat", "dd/MM/yyyy HH:mm") + .json(datesRecords) + assert(timestampsWithFormat.schema === customSchema) + + timestampsWithFormat.write + .format("json") + .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") + .option(DateTimeUtils.TIMEZONE_OPTION, "UTC") + .save(timestampsWithFormatPath) + + val readBack = spark.read + .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") + .option(DateTimeUtils.TIMEZONE_OPTION, "UTC") + .json(timestampsWithFormatPath) + + assert(readBack.schema === customSchema) + checkAnswer(readBack, timestampsWithFormat) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index 57b896612bfe0..bf6d0ea5788dd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.sources import java.io.File -import java.util.TimeZone import scala.util.Random @@ -126,61 +125,59 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes } else { Seq(false) } - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> "false") { - for (dataType <- supportedDataTypes) { - for (parquetDictionaryEncodingEnabled <- parquetDictionaryEncodingEnabledConfs) { - val extraMessage = if (isParquetDataSource) { - s" with parquet.enable.dictionary = $parquetDictionaryEncodingEnabled" - } else { - "" - } - logInfo(s"Testing $dataType data type$extraMessage") - - val extraOptions = Map[String, String]( - "parquet.enable.dictionary" -> parquetDictionaryEncodingEnabled.toString, - "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss.SSSXXXXX" - ) - - withTempPath { file => - val path = file.getCanonicalPath - - val seed = System.nanoTime() - withClue(s"Random data generated with the seed: ${seed}") { - val dataGenerator = RandomDataGenerator.forType( - dataType = dataType, - nullable = true, - new Random(seed) - ).getOrElse { - fail(s"Failed to create data generator for schema $dataType") - } - - // Create a DF for the schema with random data. The index field is used to sort the - // DataFrame. This is a workaround for SPARK-10591. - val schema = new StructType() - .add("index", IntegerType, nullable = false) - .add("col", dataType, nullable = true) - val rdd = - spark.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator()))) - val df = spark.createDataFrame(rdd, schema).orderBy("index").coalesce(1) - - df.write - .mode("overwrite") - .format(dataSourceName) - .option("dataSchema", df.schema.json) - .options(extraOptions) - .save(path) - - val loadedDF = spark - .read - .format(dataSourceName) - .option("dataSchema", df.schema.json) - .schema(df.schema) - .options(extraOptions) - .load(path) - .orderBy("index") - - checkAnswer(loadedDF, df) + for (dataType <- supportedDataTypes) { + for (parquetDictionaryEncodingEnabled <- parquetDictionaryEncodingEnabledConfs) { + val extraMessage = if (isParquetDataSource) { + s" with parquet.enable.dictionary = $parquetDictionaryEncodingEnabled" + } else { + "" + } + logInfo(s"Testing $dataType data type$extraMessage") + + val extraOptions = Map[String, String]( + "parquet.enable.dictionary" -> parquetDictionaryEncodingEnabled.toString, + "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss.SSSXXXXX" + ) + + withTempPath { file => + val path = file.getCanonicalPath + + val seed = System.nanoTime() + withClue(s"Random data generated with the seed: ${seed}") { + val dataGenerator = RandomDataGenerator.forType( + dataType = dataType, + nullable = true, + new Random(seed) + ).getOrElse { + fail(s"Failed to create data generator for schema $dataType") } + + // Create a DF for the schema with random data. The index field is used to sort the + // DataFrame. This is a workaround for SPARK-10591. + val schema = new StructType() + .add("index", IntegerType, nullable = false) + .add("col", dataType, nullable = true) + val rdd = + spark.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator()))) + val df = spark.createDataFrame(rdd, schema).orderBy("index").coalesce(1) + + df.write + .mode("overwrite") + .format(dataSourceName) + .option("dataSchema", df.schema.json) + .options(extraOptions) + .save(path) + + val loadedDF = spark + .read + .format(dataSourceName) + .option("dataSchema", df.schema.json) + .schema(df.schema) + .options(extraOptions) + .load(path) + .orderBy("index") + + checkAnswer(loadedDF, df) } } }