Skip to content

Commit

Permalink
Make ORC format propagate Hadoop config from DS options to underlying…
Browse files Browse the repository at this point in the history
… HDFS file system
  • Loading branch information
MaxGekk committed Oct 8, 2020
1 parent 7d6e3fb commit ef6d7f9
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ object OrcUtils extends Logging {
}
}

def readSchema(sparkSession: SparkSession, files: Seq[FileStatus])
def readSchema(sparkSession: SparkSession, files: Seq[FileStatus], options: Map[String, String])
: Option[StructType] = {
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
val conf = sparkSession.sessionState.newHadoopConf()
val conf = sparkSession.sessionState.newHadoopConfWithOptions(options)
files.toIterator.map(file => readSchema(file.getPath, conf, ignoreCorruptFiles)).collectFirst {
case Some(schema) =>
logDebug(s"Reading schema from file $files, got Hive schema string: $schema")
Expand Down Expand Up @@ -125,7 +125,7 @@ object OrcUtils extends Logging {
SchemaMergeUtils.mergeSchemasInParallel(
sparkSession, options, files, OrcUtils.readOrcSchemasInParallel)
} else {
OrcUtils.readSchema(sparkSession, files)
OrcUtils.readSchema(sparkSession, files, options)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.orc.impl.RecordReaderImpl
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
import org.apache.spark.sql.{Row, SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.{FakeFileSystemRequiringDSOption, Row, SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.execution.datasources.SchemaMergeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -537,6 +537,21 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
}
}
}

test("SPARK-33094: should propagate Hadoop config from DS options to underlying file system") {
withSQLConf(
"fs.file.impl" -> classOf[FakeFileSystemRequiringDSOption].getName,
"fs.file.impl.disable.cache" -> "true") {
Seq(false, true).foreach { mergeSchema =>
withTempPath { dir =>
val path = dir.getAbsolutePath
val conf = Map("ds_option" -> "value", "mergeSchema" -> mergeSchema.toString)
spark.range(1).write.options(conf).orc(path)
checkAnswer(spark.read.options(conf).orc(path), Row(0))
}
}
}
}
}

class OrcSourceSuite extends OrcSuite with SharedSparkSession {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
OrcFileOperator.readSchema(
files.map(_.getPath.toString),
Some(sparkSession.sessionState.newHadoopConf()),
Some(sparkSession.sessionState.newHadoopConfWithOptions(options)),
ignoreCorruptFiles
)
}
Expand Down

0 comments on commit ef6d7f9

Please sign in to comment.