Skip to content

Commit

Permalink
Merge pull request #8 from dongjoon-hyun/SPARK-25425
Browse files Browse the repository at this point in the history
Add a test case for write path. Thank you very much, @dongjoon-hyun
  • Loading branch information
MaxGekk authored Sep 15, 2018
2 parents eba46d9 + d35d01b commit 325b9c4
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.sources.v2

import java.io.File

import test.org.apache.spark.sql.sources.v2._

import org.apache.spark.SparkException
Expand Down Expand Up @@ -318,7 +320,7 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
checkCanonicalizedOutput(df.select('i), 2, 1)
}

test("SPARK-25425: extra options override sessions options") {
test("SPARK-25425: extra options should override sessions options during reading") {
val prefix = "spark.datasource.userDefinedDataSource."
val optionName = "optionA"
withSQLConf(prefix + optionName -> "true") {
Expand All @@ -332,6 +334,23 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
assert(options.get.get(optionName) == Some("false"))
}
}

test("SPARK-25425: extra options should override sessions options during writing") {
withTempPath { path =>
val sessionPath = path.getCanonicalPath
withSQLConf("spark.datasource.simpleWritableDataSource.path" -> sessionPath) {
withTempPath { file =>
val optionPath = file.getCanonicalPath
val format = classOf[SimpleWritableDataSource].getName

val df = Seq((1L, 2L)).toDF("i", "j")
df.write.format(format).option("path", optionPath).save()
assert(!new File(sessionPath).exists)
checkAnswer(spark.read.format(format).option("path", optionPath).load(), df)
}
}
}
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,14 @@ import org.apache.spark.util.SerializableConfiguration
* Each job moves files from `target/_temporary/queryId/` to `target`.
*/
class SimpleWritableDataSource extends DataSourceV2
with BatchReadSupportProvider with BatchWriteSupportProvider {
with BatchReadSupportProvider
with BatchWriteSupportProvider
with SessionConfigSupport {

private val schema = new StructType().add("i", "long").add("j", "long")

override def keyPrefix: String = "simpleWritableDataSource"

class ReadSupport(path: String, conf: Configuration) extends SimpleReadSupport {

override def fullSchema(): StructType = schema
Expand Down

0 comments on commit 325b9c4

Please sign in to comment.