diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 1a1d73f6607f2..78e560746c851 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -160,37 +160,6 @@ class ParquetFileFormat val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead - hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, readSupportClass) - hadoopConf.set( - ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, - rowWithSchema) - hadoopConf.set( - ParquetWriteSupport.SPARK_ROW_SCHEMA, - rowWithSchema) - hadoopConf.set( - SQLConf.SESSION_LOCAL_TIMEZONE.key, - localTimezone) - hadoopConf.setBoolean( - SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key, - nestedSchemaPruningEnabled) - hadoopConf.setBoolean( - SQLConf.CASE_SENSITIVE.key, - sparkSession.sessionState.conf.caseSensitiveAnalysis) - - // Sets flags for `ParquetToSparkSchemaConverter` - hadoopConf.setBoolean( - SQLConf.PARQUET_BINARY_AS_STRING.key, - isParquetBinaryAsString) - hadoopConf.setBoolean( - SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, - isParquetINT96AsTimestamp) - hadoopConf.setBoolean( - SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key, - parquetInferTimestampNTZEnabled) - hadoopConf.setBoolean( - SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, - legacyParquetNanosAsLong) - val broadcastedHadoopConf = options.map(_ => sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))) @@ -215,50 +184,49 @@ class ParquetFileFormat val filePath = file.toPath val split = new FileSplit(filePath, file.start, file.length, Array.empty[String]) - val sharedConf = if (!broadcastedHadoopConf.isEmpty) { + val conf = if (!broadcastedHadoopConf.isEmpty) { broadcastedHadoopConf.head.value.value } else { - val conf = SparkHadoopUtil.get.newConfiguration(sparkConf) - conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, readSupportClass) - conf.set( - ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, - rowWithSchema) - conf.set( - ParquetWriteSupport.SPARK_ROW_SCHEMA, - rowWithSchema) - conf.set( - SQLConf.SESSION_LOCAL_TIMEZONE.key, - localTimezone) - conf.setBoolean( - SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key, - nestedSchemaPruningEnabled) - conf.setBoolean( - SQLConf.CASE_SENSITIVE.key, - isCaseSensitive) - - // Sets flags for `ParquetToSparkSchemaConverter` - conf.setBoolean( - SQLConf.PARQUET_BINARY_AS_STRING.key, - isParquetBinaryAsString) - conf.setBoolean( - SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, - isParquetINT96AsTimestamp) - conf.setBoolean( - SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key, - parquetInferTimestampNTZEnabled) - conf.setBoolean( - SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, - legacyParquetNanosAsLong) - conf + SparkHadoopUtil.get.newConfiguration(sparkConf) } + conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, readSupportClass) + conf.set( + ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, + rowWithSchema) + conf.set( + ParquetWriteSupport.SPARK_ROW_SCHEMA, + rowWithSchema) + conf.set( + SQLConf.SESSION_LOCAL_TIMEZONE.key, + localTimezone) + conf.setBoolean( + SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key, + nestedSchemaPruningEnabled) + conf.setBoolean( + SQLConf.CASE_SENSITIVE.key, + isCaseSensitive) + + // Sets flags for `ParquetToSparkSchemaConverter` + conf.setBoolean( + SQLConf.PARQUET_BINARY_AS_STRING.key, + isParquetBinaryAsString) + conf.setBoolean( + SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, + isParquetINT96AsTimestamp) + conf.setBoolean( + SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key, + parquetInferTimestampNTZEnabled) + conf.setBoolean( + SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, + legacyParquetNanosAsLong) val fileFooter = if (enableVectorizedReader) { // When there are vectorized reads, we can avoid reading the footer twice by reading // all row groups in advance and filter row groups according to filters that require // push down (no need to read the footer metadata again). - ParquetFooterReader.readFooter(sharedConf, file, ParquetFooterReader.WITH_ROW_GROUPS) + ParquetFooterReader.readFooter(conf, file, ParquetFooterReader.WITH_ROW_GROUPS) } else { - ParquetFooterReader.readFooter(sharedConf, file, ParquetFooterReader.SKIP_ROW_GROUPS) + ParquetFooterReader.readFooter(conf, file, ParquetFooterReader.SKIP_ROW_GROUPS) } val footerFileMetaData = fileFooter.getFileMetaData @@ -300,7 +268,7 @@ class ParquetFileFormat val convertTz = if (timestampConversion && !isCreatedByParquetMr) { - Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))) + Some(DateTimeUtils.getZoneId(conf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))) } else { None } @@ -308,7 +276,7 @@ class ParquetFileFormat val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = - new TaskAttemptContextImpl(sharedConf, attemptId) + new TaskAttemptContextImpl(conf, attemptId) // Try to push down filters when filter push-down is enabled. // Notice: This push-down is RowGroups level, not individual records.