Skip to content

Commit

Permalink
[SPARK-40819][SQL][3.3] Timestamp nanos behaviour regression
Browse files Browse the repository at this point in the history
As per HyukjinKwon request on #38312 to backport fix into 3.3
### What changes were proposed in this pull request?

Handle `TimeUnit.NANOS` for parquet `Timestamps` addressing a regression in behaviour since 3.2

### Why are the changes needed?

Since version 3.2 reading parquet files that contain attributes with type `TIMESTAMP(NANOS,true)` is not possible as ParquetSchemaConverter returns
```
Caused by: org.apache.spark.sql.AnalysisException: Illegal Parquet type: INT64 (TIMESTAMP(NANOS,true))
```
https://issues.apache.org/jira/browse/SPARK-34661 introduced a change matching on the `LogicalTypeAnnotation` which only covers Timestamp cases for `TimeUnit.MILLIS` and `TimeUnit.MICROS` meaning `TimeUnit.NANOS` would return `illegalType()`

Prior to 3.2 the matching used the `originalType` which for `TIMESTAMP(NANOS,true)` return `null` and therefore resulted to a `LongType`, the change proposed is too consider `TimeUnit.NANOS` and return `LongType` making behaviour the same as before.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added unit test covering this scenario.
Internally deployed to read parquet files that contain `TIMESTAMP(NANOS,true)`

Closes #39904 from awdavidson/ts-nanos-fix-3.3.

Lead-authored-by: alfreddavidson <alfie.davidson9@gmail.com>
Co-authored-by: awdavidson <54780428+awdavidson@users.noreply.github.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
2 people authored and HyukjinKwon committed Feb 8, 2023
1 parent 51ed6ba commit 3ec9b05
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3459,6 +3459,13 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val LEGACY_PARQUET_NANOS_AS_LONG = buildConf("spark.sql.legacy.parquet.nanosAsLong")
.internal()
.doc("When true, the Parquet's nanos precision timestamps are converted to SQL long values.")
.version("3.2.3")
.booleanConf
.createWithDefault(false)

val PARQUET_INT96_REBASE_MODE_IN_WRITE =
buildConf("spark.sql.parquet.int96RebaseModeInWrite")
.internal()
Expand Down Expand Up @@ -4525,6 +4532,8 @@ class SQLConf extends Serializable with Logging {

def ignoreMissingParquetFieldId: Boolean = getConf(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID)

def legacyParquetNanosAsLong: Boolean = getConf(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG)

def useV1Command: Boolean = getConf(SQLConf.LEGACY_USE_V1_COMMAND)

def histogramNumericPropagateInputType: Boolean =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ protected void initialize(String path, List<String> columns) throws IOException
config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key() , false);
config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
config.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), false);

this.file = new Path(path);
long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen();
Expand Down Expand Up @@ -199,6 +200,7 @@ protected void initialize(
config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key() , false);
config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
config.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), false);
this.parquetColumn = new ParquetToSparkSchemaConverter(config)
.convertParquetColumn(requestedSchema, Option.empty());
this.sparkSchema = (StructType) parquetColumn.sparkType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ class ParquetFileFormat
SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key,
sparkSession.sessionState.conf.parquetFieldIdWriteEnabled.toString)

conf.set(
SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
sparkSession.sessionState.conf.legacyParquetNanosAsLong.toString)

// Sets compression scheme
conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName)

Expand Down Expand Up @@ -239,6 +243,9 @@ class ParquetFileFormat
hadoopConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
hadoopConf.setBoolean(
SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
sparkSession.sessionState.conf.legacyParquetNanosAsLong)

val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
Expand Down Expand Up @@ -440,7 +447,8 @@ object ParquetFileFormat extends Logging {

val converter = new ParquetToSparkSchemaConverter(
sparkSession.sessionState.conf.isParquetBinaryAsString,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
sparkSession.sessionState.conf.isParquetINT96AsTimestamp,
nanosAsLong = sparkSession.sessionState.conf.legacyParquetNanosAsLong)

val seen = mutable.HashSet[String]()
val finalSchemas: Seq[StructType] = footers.flatMap { footer =>
Expand Down Expand Up @@ -536,12 +544,14 @@ object ParquetFileFormat extends Logging {
sparkSession: SparkSession): Option[StructType] = {
val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString
val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp
val nanosAsLong = sparkSession.sessionState.conf.legacyParquetNanosAsLong

val reader = (files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean) => {
// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
val converter = new ParquetToSparkSchemaConverter(
assumeBinaryIsString = assumeBinaryIsString,
assumeInt96IsTimestamp = assumeInt96IsTimestamp)
assumeInt96IsTimestamp = assumeInt96IsTimestamp,
nanosAsLong = nanosAsLong)

readParquetFootersInParallel(conf, files, ignoreCorruptFiles)
.map(ParquetFileFormat.readSchemaFromFooter(_, converter))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,25 @@ import org.apache.spark.util.Utils
* [[TimestampType]] fields.
* @param caseSensitive Whether use case sensitive analysis when comparing Spark catalyst read
* schema with Parquet schema
* @param nanosAsLong Whether timestamps with nanos are converted to long.
*/
class ParquetToSparkSchemaConverter(
assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
caseSensitive: Boolean = SQLConf.CASE_SENSITIVE.defaultValue.get) {
caseSensitive: Boolean = SQLConf.CASE_SENSITIVE.defaultValue.get,
nanosAsLong: Boolean = SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.defaultValue.get) {

def this(conf: SQLConf) = this(
assumeBinaryIsString = conf.isParquetBinaryAsString,
assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp,
caseSensitive = conf.caseSensitiveAnalysis)
caseSensitive = conf.caseSensitiveAnalysis,
nanosAsLong = conf.legacyParquetNanosAsLong)

def this(conf: Configuration) = this(
assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
caseSensitive = conf.get(SQLConf.CASE_SENSITIVE.key).toBoolean)
caseSensitive = conf.get(SQLConf.CASE_SENSITIVE.key).toBoolean,
conf.get(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key).toBoolean)


/**
Expand Down Expand Up @@ -257,6 +261,11 @@ class ParquetToSparkSchemaConverter(
// SPARK-38829: Remove TimestampNTZ type support in Parquet for Spark 3.3
if (Utils.isTesting) TimestampNTZType else TimestampType
}
// SPARK-40819: NANOS are not supported as a Timestamp, convert to LongType without
// timezone awareness to address behaviour regression introduced by SPARK-34661
case timestamp: TimestampLogicalTypeAnnotation
if timestamp.getUnit == TimeUnit.NANOS && nanosAsLong =>
LongType
case _ => illegalType()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ case class ParquetScan(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)

hadoopConf.setBoolean(
SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
sparkSession.sessionState.conf.legacyParquetNanosAsLong)

val broadcastedConf = sparkSession.sparkContext.broadcast(
new SerializableConfiguration(hadoopConf))
val sqlConf = sparkSession.sessionState.conf
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
import org.apache.spark.sql.functions.desc
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType._
import org.apache.spark.sql.test.SharedSparkSession
Expand All @@ -46,15 +47,17 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
binaryAsString: Boolean,
int96AsTimestamp: Boolean,
writeLegacyParquetFormat: Boolean,
expectedParquetColumn: Option[ParquetColumn] = None): Unit = {
expectedParquetColumn: Option[ParquetColumn] = None,
nanosAsLong: Boolean = false): Unit = {
testSchema(
testName,
StructType.fromAttributes(ScalaReflection.attributesFor[T]),
messageType,
binaryAsString,
int96AsTimestamp,
writeLegacyParquetFormat,
expectedParquetColumn = expectedParquetColumn)
expectedParquetColumn = expectedParquetColumn,
nanosAsLong = nanosAsLong)
}

protected def testParquetToCatalyst(
Expand All @@ -65,11 +68,13 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
int96AsTimestamp: Boolean,
caseSensitive: Boolean = false,
sparkReadSchema: Option[StructType] = None,
expectedParquetColumn: Option[ParquetColumn] = None): Unit = {
expectedParquetColumn: Option[ParquetColumn] = None,
nanosAsLong: Boolean = false): Unit = {
val converter = new ParquetToSparkSchemaConverter(
assumeBinaryIsString = binaryAsString,
assumeInt96IsTimestamp = int96AsTimestamp,
caseSensitive = caseSensitive)
caseSensitive = caseSensitive,
nanosAsLong = nanosAsLong)

test(s"sql <= parquet: $testName") {
val actualParquetColumn = converter.convertParquetColumn(
Expand Down Expand Up @@ -117,7 +122,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
writeLegacyParquetFormat: Boolean,
outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
SQLConf.ParquetOutputTimestampType.INT96,
expectedParquetColumn: Option[ParquetColumn] = None): Unit = {
expectedParquetColumn: Option[ParquetColumn] = None,
nanosAsLong: Boolean = false): Unit = {

testCatalystToParquet(
testName,
Expand All @@ -132,7 +138,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
parquetSchema,
binaryAsString,
int96AsTimestamp,
expectedParquetColumn = expectedParquetColumn)
expectedParquetColumn = expectedParquetColumn,
nanosAsLong = nanosAsLong)
}

protected def compareParquetColumn(actual: ParquetColumn, expected: ParquetColumn): Unit = {
Expand All @@ -147,7 +154,14 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
val expectedDesc = expected.descriptor.get
assert(actualDesc.getMaxRepetitionLevel == expectedDesc.getMaxRepetitionLevel)
assert(actualDesc.getMaxRepetitionLevel == expectedDesc.getMaxRepetitionLevel)
assert(actualDesc.getPrimitiveType === expectedDesc.getPrimitiveType)

actualDesc.getPrimitiveType.getLogicalTypeAnnotation match {
case timestamp: LogicalTypeAnnotation.TimestampLogicalTypeAnnotation
if timestamp.getUnit == LogicalTypeAnnotation.TimeUnit.NANOS =>
assert(actual.sparkType == expected.sparkType)
case _ =>
assert(actualDesc.getPrimitiveType === expectedDesc.getPrimitiveType)
}
}

assert(actual.repetitionLevel == expected.repetitionLevel, "repetition level mismatch: " +
Expand Down Expand Up @@ -195,6 +209,32 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
}

class ParquetSchemaInferenceSuite extends ParquetSchemaTest {
testSchemaInference[Tuple1[Long]](
"timestamp nanos",
"""
|message root {
| required int64 _1 (TIMESTAMP(NANOS,true));
|}
""".stripMargin,
binaryAsString = false,
int96AsTimestamp = true,
writeLegacyParquetFormat = true,
expectedParquetColumn = Some(
ParquetColumn(
sparkType = StructType.fromAttributes(
ScalaReflection.attributesFor[Tuple1[Long]]),
descriptor = None,
repetitionLevel = 0,
definitionLevel = 0,
required = false,
path = Seq(),
children = Seq(
primitiveParquetColumn(LongType, PrimitiveTypeName.INT64, Repetition.REQUIRED,
0, 0, Seq("_1"), logicalTypeAnnotation = Some(LogicalTypeAnnotation.intType(64, false)))
))),
nanosAsLong = true
)

testSchemaInference[(Boolean, Int, Long, Float, Double, Array[Byte])](
"basic types",
"""
Expand Down Expand Up @@ -1005,6 +1045,24 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
}
}

test("SPARK-40819: parquet file with TIMESTAMP(NANOS, true) (with nanosAsLong=true)") {
val tsAttribute = "birthday"
withSQLConf(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key -> "true") {
val testDataPath = testFile("test-data/timestamp-nanos.parquet")
val data = spark.read.parquet(testDataPath).select(tsAttribute)
assert(data.schema.fields.head.dataType == LongType)
assert(data.orderBy(desc(tsAttribute)).take(1).head.getAs[Long](0) == 1668537129123534758L)
}
}

test("SPARK-40819: parquet file with TIMESTAMP(NANOS, true) (with default nanosAsLong=false)") {
val testDataPath = testFile("test-data/timestamp-nanos.parquet")
val e = intercept[SparkException] {
spark.read.parquet(testDataPath).collect()
}
assert(e.getMessage.contains("Illegal Parquet type: INT64 (TIMESTAMP(NANOS,true))"))
}

// =======================================================
// Tests for converting Parquet LIST to Catalyst ArrayType
// =======================================================
Expand Down

0 comments on commit 3ec9b05

Please sign in to comment.