Skip to content

Commit

Permalink
[SPARK=44305][SQL]Broadcast operation is not required when no paramet…
Browse files Browse the repository at this point in the history
…ers are specified
  • Loading branch information
7mming7 committed Apr 22, 2024
1 parent b20356e commit a63c38a
Showing 1 changed file with 81 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GRO
import org.apache.parquet.hadoop._

import org.apache.spark.TaskContext
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{PATH, SCHEMA}
import org.apache.spark.sql._
Expand Down Expand Up @@ -159,15 +160,19 @@ class ParquetFileFormat
SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
sparkSession.sessionState.conf.legacyParquetNanosAsLong)


val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

// TODO: if you move this into the closure it reverts to the default values.
// If true, enable using the custom RecordReader for parquet. This only works for
// a subset of the types (no complex types).
val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
val sqlConf = sparkSession.sessionState.conf
val readSupportClass = classOf[ParquetReadSupport].getName
val rowWithSchema = requiredSchema.json
val localTimezone = sqlConf.sessionLocalTimeZone
val nestedSchemaPruningEnabled = sqlConf.nestedSchemaPruningEnabled
val isParquetBinaryAsString = sqlConf.isParquetBinaryAsString
val isParquetINT96AsTimestamp = sqlConf.isParquetINT96AsTimestamp
val parquetInferTimestampNTZEnabled = sqlConf.parquetInferTimestampNTZEnabled
val legacyParquetNanosAsLong = sqlConf.legacyParquetNanosAsLong
val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
val enableVectorizedReader: Boolean =
ParquetUtils.isBatchReadSupportedForSchema(sqlConf, resultSchema)
Expand All @@ -185,6 +190,40 @@ class ParquetFileFormat
val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead

hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, readSupportClass)
hadoopConf.set(
ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
rowWithSchema)
hadoopConf.set(
ParquetWriteSupport.SPARK_ROW_SCHEMA,
rowWithSchema)
hadoopConf.set(
SQLConf.SESSION_LOCAL_TIMEZONE.key,
localTimezone)
hadoopConf.setBoolean(
SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
nestedSchemaPruningEnabled)
hadoopConf.setBoolean(
SQLConf.CASE_SENSITIVE.key,
sparkSession.sessionState.conf.caseSensitiveAnalysis)

// Sets flags for `ParquetToSparkSchemaConverter`
hadoopConf.setBoolean(
SQLConf.PARQUET_BINARY_AS_STRING.key,
isParquetBinaryAsString)
hadoopConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
isParquetINT96AsTimestamp)
hadoopConf.setBoolean(
SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key,
parquetInferTimestampNTZEnabled)
hadoopConf.setBoolean(
SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
legacyParquetNanosAsLong)

val broadcastedHadoopConf = options.map(_ =>
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)))

// Should always be set by FileSourceScanExec creating this.
// Check conf before checking option, to allow working around an issue by changing conf.
val returningBatch = sparkSession.sessionState.conf.parquetVectorizedReaderEnabled &&
Expand All @@ -199,13 +238,49 @@ class ParquetFileFormat
assert(supportBatch(sparkSession, resultSchema))
}

val sparkConf = sparkSession.sparkContext.conf
(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)

val filePath = file.toPath
val split = new FileSplit(filePath, file.start, file.length, Array.empty[String])

val sharedConf = broadcastedHadoopConf.value.value
val sharedConf = if (!broadcastedHadoopConf.isEmpty) {
broadcastedHadoopConf.head.value.value
} else {
val conf = SparkHadoopUtil.get.newConfiguration(sparkConf)
conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, readSupportClass)
conf.set(
ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
rowWithSchema)
conf.set(
ParquetWriteSupport.SPARK_ROW_SCHEMA,
rowWithSchema)
conf.set(
SQLConf.SESSION_LOCAL_TIMEZONE.key,
localTimezone)
conf.setBoolean(
SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
nestedSchemaPruningEnabled)
conf.setBoolean(
SQLConf.CASE_SENSITIVE.key,
isCaseSensitive)

// Sets flags for `ParquetToSparkSchemaConverter`
conf.setBoolean(
SQLConf.PARQUET_BINARY_AS_STRING.key,
isParquetBinaryAsString)
conf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
isParquetINT96AsTimestamp)
conf.setBoolean(
SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key,
parquetInferTimestampNTZEnabled)
conf.setBoolean(
SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
legacyParquetNanosAsLong)
conf
}

val fileFooter = if (enableVectorizedReader) {
// When there are vectorized reads, we can avoid reading the footer twice by reading
Expand Down Expand Up @@ -263,7 +338,7 @@ class ParquetFileFormat

val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext =
new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
new TaskAttemptContextImpl(sharedConf, attemptId)

// Try to push down filters when filter push-down is enabled.
// Notice: This push-down is RowGroups level, not individual records.
Expand Down

0 comments on commit a63c38a

Please sign in to comment.