Skip to content

Commit

Permalink
[SPARK-25425][SQL] Extra options should override session options in D…
Browse files Browse the repository at this point in the history
…ataSource V2

## What changes were proposed in this pull request?

In the PR, I propose overriding session options by extra options in DataSource V2. Extra options are more specific and set via `.option()`, and should overwrite more generic session options. Entries from seconds map overwrites entries with the same key from the first map, for example:
```Scala
scala> Map("option" -> false) ++ Map("option" -> true)
res0: scala.collection.immutable.Map[String,Boolean] = Map(option -> true)
```

## How was this patch tested?

Added a test for checking which option is propagated to a data source in `load()`.

Closes #22413 from MaxGekk/session-options.

Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
3 people committed Sep 16, 2018
1 parent bb2f069 commit e06da95
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
DataSourceOptions.PATHS_KEY -> objectMapper.writeValueAsString(paths.toArray)
}
Dataset.ofRows(sparkSession, DataSourceV2Relation.create(
ds, extraOptions.toMap ++ sessionOptions + pathsOption,
ds, sessionOptions ++ extraOptions.toMap + pathsOption,
userSpecifiedSchema = userSpecifiedSchema))
} else {
loadV1Source(paths: _*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
val source = cls.newInstance().asInstanceOf[DataSourceV2]
source match {
case provider: BatchWriteSupportProvider =>
val options = extraOptions ++
DataSourceV2Utils.extractSessionConfigs(source, df.sparkSession.sessionState.conf)
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
source,
df.sparkSession.sessionState.conf)
val options = sessionOptions ++ extraOptions

val relation = DataSourceV2Relation.create(source, options.toMap)
val relation = DataSourceV2Relation.create(source, options)
if (mode == SaveMode.Append) {
runCommand(df.sparkSession, "save") {
AppendData.byName(relation, df.logicalPlan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.sources.v2

import java.io.File

import test.org.apache.spark.sql.sources.v2._

import org.apache.spark.SparkException
Expand Down Expand Up @@ -317,6 +319,38 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
checkCanonicalizedOutput(df, 2, 2)
checkCanonicalizedOutput(df.select('i), 2, 1)
}

test("SPARK-25425: extra options should override sessions options during reading") {
val prefix = "spark.datasource.userDefinedDataSource."
val optionName = "optionA"
withSQLConf(prefix + optionName -> "true") {
val df = spark
.read
.option(optionName, false)
.format(classOf[DataSourceV2WithSessionConfig].getName).load()
val options = df.queryExecution.optimizedPlan.collectFirst {
case d: DataSourceV2Relation => d.options
}
assert(options.get.get(optionName) == Some("false"))
}
}

test("SPARK-25425: extra options should override sessions options during writing") {
withTempPath { path =>
val sessionPath = path.getCanonicalPath
withSQLConf("spark.datasource.simpleWritableDataSource.path" -> sessionPath) {
withTempPath { file =>
val optionPath = file.getCanonicalPath
val format = classOf[SimpleWritableDataSource].getName

val df = Seq((1L, 2L)).toDF("i", "j")
df.write.format(format).option("path", optionPath).save()
assert(!new File(sessionPath).exists)
checkAnswer(spark.read.format(format).option("path", optionPath).load(), df)
}
}
}
}
}


Expand Down Expand Up @@ -385,7 +419,6 @@ class SimpleDataSourceV2 extends DataSourceV2 with BatchReadSupportProvider {
}
}


class AdvancedDataSourceV2 extends DataSourceV2 with BatchReadSupportProvider {

class ReadSupport extends SimpleReadSupport {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,14 @@ import org.apache.spark.util.SerializableConfiguration
* Each job moves files from `target/_temporary/queryId/` to `target`.
*/
class SimpleWritableDataSource extends DataSourceV2
with BatchReadSupportProvider with BatchWriteSupportProvider {
with BatchReadSupportProvider
with BatchWriteSupportProvider
with SessionConfigSupport {

private val schema = new StructType().add("i", "long").add("j", "long")

override def keyPrefix: String = "simpleWritableDataSource"

class ReadSupport(path: String, conf: Configuration) extends SimpleReadSupport {

override def fullSchema(): StructType = schema
Expand Down

0 comments on commit e06da95

Please sign in to comment.