diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index c1e2f496682df..b77dfd9ed9e15 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -250,22 +250,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { source, df.sparkSession.sessionState.conf) val options = sessionOptions ++ extraOptions - val relation = DataSourceV2Relation.create(source, options) - if (mode == SaveMode.Append) { - runCommand(df.sparkSession, "save") { - AppendData.byName(relation, df.logicalPlan) - } + val writer = ws.createWriter( + UUID.randomUUID.toString, df.logicalPlan.output.toStructType, mode, + new DataSourceOptions(options.asJava)) - } else { - val writer = ws.createWriter( - UUID.randomUUID.toString, df.logicalPlan.output.toStructType, mode, - new DataSourceOptions(options.asJava)) - - if (writer.isPresent) { - runCommand(df.sparkSession, "save") { - WriteToDataSourceV2(writer.get, df.logicalPlan) - } + if (writer.isPresent) { + runCommand(df.sparkSession, "save") { + WriteToDataSourceV2(writer.get, df.logicalPlan) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala index bafde50bdc012..2367bdd169522 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -355,6 +355,22 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-25700: do not read schema when writing") { + withTempPath { file => + val cls = classOf[SimpleWriteOnlyDataSource] + val path = file.getCanonicalPath + val df = spark.range(5).select('id as 'i, -'id as 'j) + try { + df.write.format(cls.getName).option("path", path).mode("error").save() + df.write.format(cls.getName).option("path", path).mode("overwrite").save() + df.write.format(cls.getName).option("path", path).mode("ignore").save() + df.write.format(cls.getName).option("path", path).mode("append").save() + } catch { + case e: SchemaReadAttemptException => fail("Schema read was attempted.", e) + } + } + } } class SimpleSinglePartitionSource extends DataSourceV2 with ReadSupport { @@ -594,3 +610,14 @@ class SpecificInputPartitionReader(i: Array[Int], j: Array[Int]) override def close(): Unit = {} } + +class SchemaReadAttemptException(m: String) extends RuntimeException(m) + +class SimpleWriteOnlyDataSource extends SimpleWritableDataSource { + override def fullSchema(): StructType = { + // This is a bit hacky since this source implements read support but throws + // during schema retrieval. Might have to rewrite but it's done + // such so for minimised changes. + throw new SchemaReadAttemptException("read is not supported") + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala index 654c62d8edc5c..4cf02595cd96a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala @@ -43,12 +43,12 @@ class SimpleWritableDataSource extends DataSourceV2 with WriteSupport with SessionConfigSupport { - private val schema = new StructType().add("i", "long").add("j", "long") + protected def fullSchema() = new StructType().add("i", "long").add("j", "long") override def keyPrefix: String = "simpleWritableDataSource" class Reader(path: String, conf: Configuration) extends DataSourceReader { - override def readSchema(): StructType = schema + override def readSchema(): StructType = SimpleWritableDataSource.this.fullSchema() override def planInputPartitions(): JList[InputPartition[InternalRow]] = { val dataPath = new Path(path) @@ -113,7 +113,6 @@ class SimpleWritableDataSource extends DataSourceV2 schema: StructType, mode: SaveMode, options: DataSourceOptions): Optional[DataSourceWriter] = { - assert(DataType.equalsStructurally(schema.asNullable, this.schema.asNullable)) assert(!SparkContext.getActive.get.conf.getBoolean("spark.speculation", false)) val path = new Path(options.get("path").get())