Skip to content

Commit

Permalink
[SPARK-25700][SQL][BRANCH-2.4] Partially revert append mode support i…
Browse files Browse the repository at this point in the history
…n Data Source V2

## What changes were proposed in this pull request?

This PR proposes to partially revert 5fef6e3 so that it does make a readsupport and read schema when it writes in branch 2-4 since it's too breaking change.

5fef6e3 happened to create a readsupport in write path, which ended up with reading schema from readsupport at write path.

For instance, this breaks `spark.range(1).format("source").write.save("non-existent-path")` case since there's no way to read the schema from "non-existent-path".

See also #22009 (comment)
See also #22688
See also http://apache-spark-developers-list.1001551.n3.nabble.com/Possible-bug-in-DatasourceV2-td25343.html

## How was this patch tested?

Unit test and manual tests.

Closes #22697 from HyukjinKwon/append-revert-2.4.

Authored-by: hyukjinkwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
HyukjinKwon authored and cloud-fan committed Oct 15, 2018
1 parent 3e776d7 commit b6e4aca
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 17 deletions.
20 changes: 6 additions & 14 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -250,22 +250,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
source,
df.sparkSession.sessionState.conf)
val options = sessionOptions ++ extraOptions
val relation = DataSourceV2Relation.create(source, options)

if (mode == SaveMode.Append) {
runCommand(df.sparkSession, "save") {
AppendData.byName(relation, df.logicalPlan)
}
val writer = ws.createWriter(
UUID.randomUUID.toString, df.logicalPlan.output.toStructType, mode,
new DataSourceOptions(options.asJava))

} else {
val writer = ws.createWriter(
UUID.randomUUID.toString, df.logicalPlan.output.toStructType, mode,
new DataSourceOptions(options.asJava))

if (writer.isPresent) {
runCommand(df.sparkSession, "save") {
WriteToDataSourceV2(writer.get, df.logicalPlan)
}
if (writer.isPresent) {
runCommand(df.sparkSession, "save") {
WriteToDataSourceV2(writer.get, df.logicalPlan)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,22 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
}
}
}

test("SPARK-25700: do not read schema when writing") {
withTempPath { file =>
val cls = classOf[SimpleWriteOnlyDataSource]
val path = file.getCanonicalPath
val df = spark.range(5).select('id as 'i, -'id as 'j)
try {
df.write.format(cls.getName).option("path", path).mode("error").save()
df.write.format(cls.getName).option("path", path).mode("overwrite").save()
df.write.format(cls.getName).option("path", path).mode("ignore").save()
df.write.format(cls.getName).option("path", path).mode("append").save()
} catch {
case e: SchemaReadAttemptException => fail("Schema read was attempted.", e)
}
}
}
}

class SimpleSinglePartitionSource extends DataSourceV2 with ReadSupport {
Expand Down Expand Up @@ -594,3 +610,14 @@ class SpecificInputPartitionReader(i: Array[Int], j: Array[Int])

override def close(): Unit = {}
}

class SchemaReadAttemptException(m: String) extends RuntimeException(m)

class SimpleWriteOnlyDataSource extends SimpleWritableDataSource {
override def fullSchema(): StructType = {
// This is a bit hacky since this source implements read support but throws
// during schema retrieval. Might have to rewrite but it's done
// such so for minimised changes.
throw new SchemaReadAttemptException("read is not supported")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ class SimpleWritableDataSource extends DataSourceV2
with WriteSupport
with SessionConfigSupport {

private val schema = new StructType().add("i", "long").add("j", "long")
protected def fullSchema() = new StructType().add("i", "long").add("j", "long")

override def keyPrefix: String = "simpleWritableDataSource"

class Reader(path: String, conf: Configuration) extends DataSourceReader {
override def readSchema(): StructType = schema
override def readSchema(): StructType = SimpleWritableDataSource.this.fullSchema()

override def planInputPartitions(): JList[InputPartition[InternalRow]] = {
val dataPath = new Path(path)
Expand Down Expand Up @@ -113,7 +113,6 @@ class SimpleWritableDataSource extends DataSourceV2
schema: StructType,
mode: SaveMode,
options: DataSourceOptions): Optional[DataSourceWriter] = {
assert(DataType.equalsStructurally(schema.asNullable, this.schema.asNullable))
assert(!SparkContext.getActive.get.conf.getBoolean("spark.speculation", false))

val path = new Path(options.get("path").get())
Expand Down

0 comments on commit b6e4aca

Please sign in to comment.