Skip to content

Commit

Permalink
[SPARK-33094][SQL] Make ORC format propagate Hadoop config from DS op…
Browse files Browse the repository at this point in the history
…tions to underlying HDFS file system

### What changes were proposed in this pull request?
Propagate ORC options to Hadoop configs in Hive `OrcFileFormat` and in the regular ORC datasource.

### Why are the changes needed?
There is a bug that when running:
```scala
spark.read.format("orc").options(conf).load(path)
```
The underlying file system will not receive the conf options.

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
Added UT to `OrcSourceSuite`.

Closes #29976 from MaxGekk/orc-option-propagation.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
MaxGekk authored and dongjoon-hyun committed Oct 8, 2020
1 parent 4987db8 commit c5f6af9
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ object OrcUtils extends Logging {
}
}

def readSchema(sparkSession: SparkSession, files: Seq[FileStatus])
def readSchema(sparkSession: SparkSession, files: Seq[FileStatus], options: Map[String, String])
: Option[StructType] = {
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
val conf = sparkSession.sessionState.newHadoopConf()
val conf = sparkSession.sessionState.newHadoopConfWithOptions(options)
files.toIterator.map(file => readSchema(file.getPath, conf, ignoreCorruptFiles)).collectFirst {
case Some(schema) =>
logDebug(s"Reading schema from file $files, got Hive schema string: $schema")
Expand Down Expand Up @@ -125,7 +125,7 @@ object OrcUtils extends Logging {
SchemaMergeUtils.mergeSchemasInParallel(
sparkSession, options, files, OrcUtils.readOrcSchemasInParallel)
} else {
OrcUtils.readSchema(sparkSession, files)
OrcUtils.readSchema(sparkSession, files, options)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.orc.impl.RecordReaderImpl
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
import org.apache.spark.sql.{Row, SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.{FakeFileSystemRequiringDSOption, Row, SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.execution.datasources.SchemaMergeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -537,6 +537,21 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
}
}
}

test("SPARK-33094: should propagate Hadoop config from DS options to underlying file system") {
withSQLConf(
"fs.file.impl" -> classOf[FakeFileSystemRequiringDSOption].getName,
"fs.file.impl.disable.cache" -> "true") {
Seq(false, true).foreach { mergeSchema =>
withTempPath { dir =>
val path = dir.getAbsolutePath
val conf = Map("ds_option" -> "value", "mergeSchema" -> mergeSchema.toString)
spark.range(1).write.options(conf).orc(path)
checkAnswer(spark.read.options(conf).orc(path), Row(0))
}
}
}
}
}

class OrcSourceSuite extends OrcSuite with SharedSparkSession {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
OrcFileOperator.readSchema(
files.map(_.getPath.toString),
Some(sparkSession.sessionState.newHadoopConf()),
Some(sparkSession.sessionState.newHadoopConfWithOptions(options)),
ignoreCorruptFiles
)
}
Expand Down

0 comments on commit c5f6af9

Please sign in to comment.