Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
7mming7 committed Apr 23, 2024
1 parent 95496a4 commit 44c95d2
Showing 1 changed file with 36 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,37 +160,6 @@ class ParquetFileFormat
val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead

hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, readSupportClass)
hadoopConf.set(
ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
rowWithSchema)
hadoopConf.set(
ParquetWriteSupport.SPARK_ROW_SCHEMA,
rowWithSchema)
hadoopConf.set(
SQLConf.SESSION_LOCAL_TIMEZONE.key,
localTimezone)
hadoopConf.setBoolean(
SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
nestedSchemaPruningEnabled)
hadoopConf.setBoolean(
SQLConf.CASE_SENSITIVE.key,
sparkSession.sessionState.conf.caseSensitiveAnalysis)

// Sets flags for `ParquetToSparkSchemaConverter`
hadoopConf.setBoolean(
SQLConf.PARQUET_BINARY_AS_STRING.key,
isParquetBinaryAsString)
hadoopConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
isParquetINT96AsTimestamp)
hadoopConf.setBoolean(
SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key,
parquetInferTimestampNTZEnabled)
hadoopConf.setBoolean(
SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
legacyParquetNanosAsLong)

val broadcastedHadoopConf = options.map(_ =>
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)))

Expand All @@ -215,50 +184,49 @@ class ParquetFileFormat
val filePath = file.toPath
val split = new FileSplit(filePath, file.start, file.length, Array.empty[String])

val sharedConf = if (!broadcastedHadoopConf.isEmpty) {
val conf = if (!broadcastedHadoopConf.isEmpty) {
broadcastedHadoopConf.head.value.value
} else {
val conf = SparkHadoopUtil.get.newConfiguration(sparkConf)
conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, readSupportClass)
conf.set(
ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
rowWithSchema)
conf.set(
ParquetWriteSupport.SPARK_ROW_SCHEMA,
rowWithSchema)
conf.set(
SQLConf.SESSION_LOCAL_TIMEZONE.key,
localTimezone)
conf.setBoolean(
SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
nestedSchemaPruningEnabled)
conf.setBoolean(
SQLConf.CASE_SENSITIVE.key,
isCaseSensitive)

// Sets flags for `ParquetToSparkSchemaConverter`
conf.setBoolean(
SQLConf.PARQUET_BINARY_AS_STRING.key,
isParquetBinaryAsString)
conf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
isParquetINT96AsTimestamp)
conf.setBoolean(
SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key,
parquetInferTimestampNTZEnabled)
conf.setBoolean(
SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
legacyParquetNanosAsLong)
conf
SparkHadoopUtil.get.newConfiguration(sparkConf)
}
conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, readSupportClass)
conf.set(
ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
rowWithSchema)
conf.set(
ParquetWriteSupport.SPARK_ROW_SCHEMA,
rowWithSchema)
conf.set(
SQLConf.SESSION_LOCAL_TIMEZONE.key,
localTimezone)
conf.setBoolean(
SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
nestedSchemaPruningEnabled)
conf.setBoolean(
SQLConf.CASE_SENSITIVE.key,
isCaseSensitive)

// Sets flags for `ParquetToSparkSchemaConverter`
conf.setBoolean(
SQLConf.PARQUET_BINARY_AS_STRING.key,
isParquetBinaryAsString)
conf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
isParquetINT96AsTimestamp)
conf.setBoolean(
SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key,
parquetInferTimestampNTZEnabled)
conf.setBoolean(
SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
legacyParquetNanosAsLong)

val fileFooter = if (enableVectorizedReader) {
// When there are vectorized reads, we can avoid reading the footer twice by reading
// all row groups in advance and filter row groups according to filters that require
// push down (no need to read the footer metadata again).
ParquetFooterReader.readFooter(sharedConf, file, ParquetFooterReader.WITH_ROW_GROUPS)
ParquetFooterReader.readFooter(conf, file, ParquetFooterReader.WITH_ROW_GROUPS)
} else {
ParquetFooterReader.readFooter(sharedConf, file, ParquetFooterReader.SKIP_ROW_GROUPS)
ParquetFooterReader.readFooter(conf, file, ParquetFooterReader.SKIP_ROW_GROUPS)
}

val footerFileMetaData = fileFooter.getFileMetaData
Expand Down Expand Up @@ -300,15 +268,15 @@ class ParquetFileFormat

val convertTz =
if (timestampConversion && !isCreatedByParquetMr) {
Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
Some(DateTimeUtils.getZoneId(conf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
} else {
None
}


val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext =
new TaskAttemptContextImpl(sharedConf, attemptId)
new TaskAttemptContextImpl(conf, attemptId)

// Try to push down filters when filter push-down is enabled.
// Notice: This push-down is RowGroups level, not individual records.
Expand Down

0 comments on commit 44c95d2

Please sign in to comment.