From 954ef96c495268e3a22b7b661ce45c558b532c65 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 15 Jan 2019 13:53:48 -0800 Subject: [PATCH] [SPARK-25530][SQL] data source v2 API refactor (batch write) ## What changes were proposed in this pull request? Adjust the batch write API to match the read API refactor after https://github.com/apache/spark/pull/23086 The doc with high-level ideas: https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?usp=sharing Basically it renames `BatchWriteSupportProvider` to `SupportsBatchWrite`, and make it extend `Table`. Renames `WriteSupport` to `Write`. It also cleans up some code as batch API is completed. This PR also removes the test from https://github.com/apache/spark/pull/22688 . Now data source must return a table for read/write. A few notes about future changes: 1. We will create `SupportsStreamingWrite` later for streaming APIs 2. We will create `SupportsBatchReplaceWhere`, `SupportsBatchAppend`, etc. for the new end-user write APIs. I think streaming APIs would remain to use `OutputMode`, and new end-user write APIs will apply to batch only, at least in the near future. 3. We will remove `SaveMode` from data source API: https://issues.apache.org/jira/browse/SPARK-26356 ## How was this patch tested? existing tests Closes #23208 from cloud-fan/refactor-batch. Authored-by: Wenchen Fan Signed-off-by: gatorsmile --- .../sources/v2/BatchWriteSupportProvider.java | 59 ------------ .../spark/sql/sources/v2/DataSourceV2.java | 10 +- .../sql/sources/v2/SupportsBatchWrite.java | 32 +++++++ .../spark/sql/sources/v2/SupportsRead.java | 5 +- .../spark/sql/sources/v2/SupportsWrite.java | 35 +++++++ ...BatchWriteSupport.java => BatchWrite.java} | 2 +- .../sql/sources/v2/writer/DataWriter.java | 12 +-- .../sources/v2/writer/DataWriterFactory.java | 2 +- .../sources/v2/writer/SupportsSaveMode.java | 26 ++++++ .../sql/sources/v2/writer/WriteBuilder.java | 69 ++++++++++++++ .../v2/writer/WriterCommitMessage.java | 2 +- .../apache/spark/sql/DataFrameReader.scala | 8 +- .../apache/spark/sql/DataFrameWriter.scala | 50 +++++----- .../datasources/v2/DataSourceV2Relation.scala | 89 +++++------------- .../datasources/v2/DataSourceV2Strategy.scala | 14 ++- .../v2/WriteToDataSourceV2Exec.scala | 26 +++--- .../streaming/MicroBatchExecution.scala | 4 +- ...ritSupport.scala => MicroBatchWrite.scala} | 7 +- .../sources/PackedRowWriterFactory.scala | 4 +- .../sql/sources/v2/DataSourceV2Suite.scala | 28 +++--- .../sources/v2/SimpleWritableDataSource.scala | 91 +++++++++++-------- 21 files changed, 330 insertions(+), 245 deletions(-) delete mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java rename sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/{BatchWriteSupport.java => BatchWrite.java} (99%) create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsSaveMode.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java rename sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/{MicroBatchWritSupport.scala => MicroBatchWrite.scala} (84%) diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java deleted file mode 100644 index df439e2c02fe3..0000000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2; - -import java.util.Optional; - -import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport; -import org.apache.spark.sql.types.StructType; - -/** - * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to - * provide data writing ability for batch processing. - * - * This interface is used to create {@link BatchWriteSupport} instances when end users run - * {@code Dataset.write.format(...).option(...).save()}. - */ -@Evolving -public interface BatchWriteSupportProvider extends DataSourceV2 { - - /** - * Creates an optional {@link BatchWriteSupport} instance to save the data to this data source, - * which is called by Spark at the beginning of each batch query. - * - * Data sources can return None if there is no writing needed to be done according to the save - * mode. - * - * @param queryId A unique string for the writing query. It's possible that there are many - * writing queries running at the same time, and the returned - * {@link BatchWriteSupport} can use this id to distinguish itself from others. - * @param schema the schema of the data to be written. - * @param mode the save mode which determines what to do when the data are already in this data - * source, please refer to {@link SaveMode} for more details. - * @param options the options for the returned data source writer, which is an immutable - * case-insensitive string-to-string map. - * @return a write support to write data to this data source. - */ - Optional createBatchWriteSupport( - String queryId, - StructType schema, - SaveMode mode, - DataSourceOptions options); -} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java index eae7a45d1d446..4aaa57dd4db9d 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java @@ -20,15 +20,7 @@ import org.apache.spark.annotation.Evolving; /** - * The base interface for data source v2. Implementations must have a public, 0-arg constructor. - * - * Note that this is an empty interface. Data source implementations must mix in interfaces such as - * {@link BatchReadSupportProvider} or {@link BatchWriteSupportProvider}, which can provide - * batch or streaming read/write support instances. Otherwise it's just a dummy data source which - * is un-readable/writable. - * - * If Spark fails to execute any methods in the implementations of this interface (by throwing an - * exception), the read action will fail and no Spark job will be submitted. + * TODO: remove it when we finish the API refactor for streaming side. */ @Evolving public interface DataSourceV2 {} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java new file mode 100644 index 0000000000000..08caadd5308e6 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.sources.v2.writer.WriteBuilder; + +/** + * An empty mix-in interface for {@link Table}, to indicate this table supports batch write. + *

+ * If a {@link Table} implements this interface, the + * {@link SupportsWrite#newWriteBuilder(DataSourceOptions)} must return a {@link WriteBuilder} + * with {@link WriteBuilder#buildForBatch()} implemented. + *

+ */ +@Evolving +public interface SupportsBatchWrite extends SupportsWrite {} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java index e22738d20d507..5031c71c0fd4d 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java @@ -29,7 +29,10 @@ interface SupportsRead extends Table { /** * Returns a {@link ScanBuilder} which can be used to build a {@link Scan}. Spark will call this - * method to configure each scan. + * method to configure each data source scan. + * + * @param options The options for reading, which is an immutable case-insensitive + * string-to-string map. */ ScanBuilder newScanBuilder(DataSourceOptions options); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java new file mode 100644 index 0000000000000..ecdfe20730254 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.sql.sources.v2.writer.BatchWrite; +import org.apache.spark.sql.sources.v2.writer.WriteBuilder; + +/** + * An internal base interface of mix-in interfaces for writable {@link Table}. This adds + * {@link #newWriteBuilder(DataSourceOptions)} that is used to create a write + * for batch or streaming. + */ +interface SupportsWrite extends Table { + + /** + * Returns a {@link WriteBuilder} which can be used to create {@link BatchWrite}. Spark will call + * this method to configure each data source write. + */ + WriteBuilder newWriteBuilder(DataSourceOptions options); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWrite.java similarity index 99% rename from sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWriteSupport.java rename to sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWrite.java index efe1ac4f78db1..91297759971b5 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWriteSupport.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWrite.java @@ -38,7 +38,7 @@ * Please refer to the documentation of commit/abort methods for detailed specifications. */ @Evolving -public interface BatchWriteSupport { +public interface BatchWrite { /** * Creates a writer factory which will be serialized and sent to executors. diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java index d142ee523ef9f..11228ad1ea672 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java @@ -36,11 +36,11 @@ * * If this data writer succeeds(all records are successfully written and {@link #commit()} * succeeds), a {@link WriterCommitMessage} will be sent to the driver side and pass to - * {@link BatchWriteSupport#commit(WriterCommitMessage[])} with commit messages from other data + * {@link BatchWrite#commit(WriterCommitMessage[])} with commit messages from other data * writers. If this data writer fails(one record fails to write or {@link #commit()} fails), an * exception will be sent to the driver side, and Spark may retry this writing task a few times. * In each retry, {@link DataWriterFactory#createWriter(int, long)} will receive a - * different `taskId`. Spark will call {@link BatchWriteSupport#abort(WriterCommitMessage[])} + * different `taskId`. Spark will call {@link BatchWrite#abort(WriterCommitMessage[])} * when the configured number of retries is exhausted. * * Besides the retry mechanism, Spark may launch speculative tasks if the existing writing task @@ -71,11 +71,11 @@ public interface DataWriter { /** * Commits this writer after all records are written successfully, returns a commit message which * will be sent back to driver side and passed to - * {@link BatchWriteSupport#commit(WriterCommitMessage[])}. + * {@link BatchWrite#commit(WriterCommitMessage[])}. * * The written data should only be visible to data source readers after - * {@link BatchWriteSupport#commit(WriterCommitMessage[])} succeeds, which means this method - * should still "hide" the written data and ask the {@link BatchWriteSupport} at driver side to + * {@link BatchWrite#commit(WriterCommitMessage[])} succeeds, which means this method + * should still "hide" the written data and ask the {@link BatchWrite} at driver side to * do the final commit via {@link WriterCommitMessage}. * * If this method fails (by throwing an exception), {@link #abort()} will be called and this @@ -93,7 +93,7 @@ public interface DataWriter { * failed. * * If this method fails(by throwing an exception), the underlying data source may have garbage - * that need to be cleaned by {@link BatchWriteSupport#abort(WriterCommitMessage[])} or manually, + * that need to be cleaned by {@link BatchWrite#abort(WriterCommitMessage[])} or manually, * but these garbage should not be visible to data source readers. * * @throws IOException if failure happens during disk/network IO like writing files. diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java index 65105f46b82d5..bf2db9059b088 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.InternalRow; /** - * A factory of {@link DataWriter} returned by {@link BatchWriteSupport#createBatchWriterFactory()}, + * A factory of {@link DataWriter} returned by {@link BatchWrite#createBatchWriterFactory()}, * which is responsible for creating and initializing the actual data writer at executor side. * * Note that, the writer factory will be serialized and sent to executors, then the data writer diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsSaveMode.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsSaveMode.java new file mode 100644 index 0000000000000..c4295f2371877 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsSaveMode.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.sql.SaveMode; + +// A temporary mixin trait for `WriteBuilder` to support `SaveMode`. Will be removed before +// Spark 3.0 when all the new write operators are finished. See SPARK-26356 for more details. +public interface SupportsSaveMode extends WriteBuilder { + WriteBuilder mode(SaveMode mode); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java new file mode 100644 index 0000000000000..e861c72af9e68 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.sources.v2.SupportsBatchWrite; +import org.apache.spark.sql.sources.v2.Table; +import org.apache.spark.sql.types.StructType; + +/** + * An interface for building the {@link BatchWrite}. Implementations can mix in some interfaces to + * support different ways to write data to data sources. + * + * Unless modified by a mixin interface, the {@link BatchWrite} configured by this builder is to + * append data without affecting existing data. + */ +@Evolving +public interface WriteBuilder { + + /** + * Passes the `queryId` from Spark to data source. `queryId` is a unique string of the query. It's + * possible that there are many queries running at the same time, or a query is restarted and + * resumed. {@link BatchWrite} can use this id to identify the query. + * + * @return a new builder with the `queryId`. By default it returns `this`, which means the given + * `queryId` is ignored. Please override this method to take the `queryId`. + */ + default WriteBuilder withQueryId(String queryId) { + return this; + } + + /** + * Passes the schema of the input data from Spark to data source. + * + * @return a new builder with the `schema`. By default it returns `this`, which means the given + * `schema` is ignored. Please override this method to take the `schema`. + */ + default WriteBuilder withInputDataSchema(StructType schema) { + return this; + } + + /** + * Returns a {@link BatchWrite} to write data to batch source. By default this method throws + * exception, data sources must overwrite this method to provide an implementation, if the + * {@link Table} that creates this scan implements {@link SupportsBatchWrite}. + * + * Note that, the returned {@link BatchWrite} can be null if the implementation supports SaveMode, + * to indicate that no writing is needed. We can clean it up after removing + * {@link SupportsSaveMode}. + */ + default BatchWrite buildForBatch() { + throw new UnsupportedOperationException("Batch scans are not supported"); + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java index 9216e34399092..6334c8f643098 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java @@ -24,7 +24,7 @@ /** * A commit message returned by {@link DataWriter#commit()} and will be sent back to the driver side - * as the input parameter of {@link BatchWriteSupport#commit(WriterCommitMessage[])} or + * as the input parameter of {@link BatchWrite#commit(WriterCommitMessage[])} or * {@link StreamingWriteSupport#commit(long, WriterCommitMessage[])}. * * This is an empty interface, data sources should define their own message class and use it when diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index ce8e4c8f5b82b..af369a5bca464 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils import org.apache.spark.sql.sources.v2._ -import org.apache.spark.sql.types.{StringType, StructType} +import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.types.UTF8String /** @@ -209,10 +209,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { case _ => provider.getTable(dsOptions) } table match { - case s: SupportsBatchRead => - Dataset.ofRows(sparkSession, DataSourceV2Relation.create( - provider, s, finalOptions, userSpecifiedSchema = userSpecifiedSchema)) - + case _: SupportsBatchRead => + Dataset.ofRows(sparkSession, DataSourceV2Relation.create(table, finalOptions)) case _ => loadV1Source(paths: _*) } } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 981b3a8fd4ac1..228dcb94b9acc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, Logi import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils, WriteToDataSourceV2} import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.sources.v2._ +import org.apache.spark.sql.sources.v2.writer.SupportsSaveMode import org.apache.spark.sql.types.StructType /** @@ -241,33 +242,38 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { assertNotBucketed("save") - val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf) - if (classOf[DataSourceV2].isAssignableFrom(cls)) { - val source = cls.getConstructor().newInstance().asInstanceOf[DataSourceV2] - source match { - case provider: BatchWriteSupportProvider => - val sessionOptions = DataSourceV2Utils.extractSessionConfigs( - source, - df.sparkSession.sessionState.conf) - val options = sessionOptions ++ extraOptions - + val session = df.sparkSession + val cls = DataSource.lookupDataSource(source, session.sessionState.conf) + if (classOf[TableProvider].isAssignableFrom(cls)) { + val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider] + val sessionOptions = DataSourceV2Utils.extractSessionConfigs( + provider, session.sessionState.conf) + val options = sessionOptions ++ extraOptions + val dsOptions = new DataSourceOptions(options.asJava) + provider.getTable(dsOptions) match { + case table: SupportsBatchWrite => if (mode == SaveMode.Append) { - val relation = DataSourceV2Relation.createRelationForWrite(source, options) + val relation = DataSourceV2Relation.create(table, options) runCommand(df.sparkSession, "save") { AppendData.byName(relation, df.logicalPlan) } - } else { - val writer = provider.createBatchWriteSupport( - UUID.randomUUID().toString, - df.logicalPlan.output.toStructType, - mode, - new DataSourceOptions(options.asJava)) - - if (writer.isPresent) { - runCommand(df.sparkSession, "save") { - WriteToDataSourceV2(writer.get, df.logicalPlan) - } + val writeBuilder = table.newWriteBuilder(dsOptions) + .withQueryId(UUID.randomUUID().toString) + .withInputDataSchema(df.logicalPlan.schema) + writeBuilder match { + case s: SupportsSaveMode => + val write = s.mode(mode).buildForBatch() + // It can only return null with `SupportsSaveMode`. We can clean it up after + // removing `SupportsSaveMode`. + if (write != null) { + runCommand(df.sparkSession, "save") { + WriteToDataSourceV2(write, df.logicalPlan) + } + } + + case _ => throw new AnalysisException( + s"data source ${table.name} does not support SaveMode $mode") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 7bf2b8bff3732..6321578184346 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -21,46 +21,51 @@ import java.util.UUID import scala.collection.JavaConverters._ -import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader._ -import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport +import org.apache.spark.sql.sources.v2.writer._ import org.apache.spark.sql.types.StructType /** - * A logical plan representing a data source v2 scan. + * A logical plan representing a data source v2 table. * - * @param source An instance of a [[DataSourceV2]] implementation. - * @param options The options for this scan. Used to create fresh [[BatchWriteSupport]]. - * @param userSpecifiedSchema The user-specified schema for this scan. + * @param table The table that this relation represents. + * @param options The options for this table operation. It's used to create fresh [[ScanBuilder]] + * and [[WriteBuilder]]. */ case class DataSourceV2Relation( - // TODO: remove `source` when we finish API refactor for write. - source: TableProvider, - table: SupportsBatchRead, + table: Table, output: Seq[AttributeReference], - options: Map[String, String], - userSpecifiedSchema: Option[StructType] = None) + options: Map[String, String]) extends LeafNode with MultiInstanceRelation with NamedRelation { - import DataSourceV2Relation._ - override def name: String = table.name() override def simpleString(maxFields: Int): String = { s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name" } - def newWriteSupport(): BatchWriteSupport = source.createWriteSupport(options, schema) + def newScanBuilder(): ScanBuilder = table match { + case s: SupportsBatchRead => + val dsOptions = new DataSourceOptions(options.asJava) + s.newScanBuilder(dsOptions) + case _ => throw new AnalysisException(s"Table is not readable: ${table.name()}") + } + - def newScanBuilder(): ScanBuilder = { - val dsOptions = new DataSourceOptions(options.asJava) - table.newScanBuilder(dsOptions) + + def newWriteBuilder(schema: StructType): WriteBuilder = table match { + case s: SupportsBatchWrite => + val dsOptions = new DataSourceOptions(options.asJava) + s.newWriteBuilder(dsOptions) + .withQueryId(UUID.randomUUID().toString) + .withInputDataSchema(schema) + case _ => throw new AnalysisException(s"Table is not writable: ${table.name()}") } override def computeStats(): Statistics = { @@ -126,52 +131,8 @@ case class StreamingDataSourceV2Relation( } object DataSourceV2Relation { - private implicit class SourceHelpers(source: DataSourceV2) { - def asWriteSupportProvider: BatchWriteSupportProvider = { - source match { - case provider: BatchWriteSupportProvider => - provider - case _ => - throw new AnalysisException(s"Data source is not writable: $name") - } - } - - def name: String = { - source match { - case registered: DataSourceRegister => - registered.shortName() - case _ => - source.getClass.getSimpleName - } - } - - def createWriteSupport( - options: Map[String, String], - schema: StructType): BatchWriteSupport = { - asWriteSupportProvider.createBatchWriteSupport( - UUID.randomUUID().toString, - schema, - SaveMode.Append, - new DataSourceOptions(options.asJava)).get - } - } - - def create( - provider: TableProvider, - table: SupportsBatchRead, - options: Map[String, String], - userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = { + def create(table: Table, options: Map[String, String]): DataSourceV2Relation = { val output = table.schema().toAttributes - DataSourceV2Relation(provider, table, output, options, userSpecifiedSchema) - } - - // TODO: remove this when we finish API refactor for write. - def createRelationForWrite( - source: DataSourceV2, - options: Map[String, String]): DataSourceV2Relation = { - val provider = source.asInstanceOf[TableProvider] - val dsOptions = new DataSourceOptions(options.asJava) - val table = provider.getTable(dsOptions) - create(provider, table.asInstanceOf[SupportsBatchRead], options) + DataSourceV2Relation(table, output, options) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 2e26fce880b68..79540b0246214 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.mutable -import org.apache.spark.sql.{sources, Strategy} +import org.apache.spark.sql.{sources, AnalysisException, SaveMode, Strategy} import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, Repartition} @@ -28,6 +28,7 @@ import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec} import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport +import org.apache.spark.sql.sources.v2.writer.SupportsSaveMode object DataSourceV2Strategy extends Strategy { @@ -110,7 +111,7 @@ object DataSourceV2Strategy extends Strategy { val (scan, output) = pruneColumns(scanBuilder, relation, project ++ postScanFilters) logInfo( s""" - |Pushing operators to ${relation.source.getClass} + |Pushing operators to ${relation.name} |Pushed Filters: ${pushedFilters.mkString(", ")} |Post-Scan Filters: ${postScanFilters.mkString(",")} |Output: ${output.mkString(", ")} @@ -136,7 +137,14 @@ object DataSourceV2Strategy extends Strategy { WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil case AppendData(r: DataSourceV2Relation, query, _) => - WriteToDataSourceV2Exec(r.newWriteSupport(), planLater(query)) :: Nil + val writeBuilder = r.newWriteBuilder(query.schema) + writeBuilder match { + case s: SupportsSaveMode => + val write = s.mode(SaveMode.Append).buildForBatch() + assert(write != null) + WriteToDataSourceV2Exec(write, planLater(query)) :: Nil + case _ => throw new AnalysisException(s"data source ${r.name} does not support SaveMode") + } case WriteToContinuousDataSource(writer, query) => WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index d7e20eed4cbc0..406fb8c3a3834 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -35,7 +35,7 @@ import org.apache.spark.util.{LongAccumulator, Utils} * specific logical plans, like [[org.apache.spark.sql.catalyst.plans.logical.AppendData]]. */ @deprecated("Use specific logical plans like AppendData instead", "2.4.0") -case class WriteToDataSourceV2(writeSupport: BatchWriteSupport, query: LogicalPlan) +case class WriteToDataSourceV2(batchWrite: BatchWrite, query: LogicalPlan) extends LogicalPlan { override def children: Seq[LogicalPlan] = Seq(query) override def output: Seq[Attribute] = Nil @@ -44,7 +44,7 @@ case class WriteToDataSourceV2(writeSupport: BatchWriteSupport, query: LogicalPl /** * The physical plan for writing data into data source v2. */ -case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: SparkPlan) +case class WriteToDataSourceV2Exec(batchWrite: BatchWrite, query: SparkPlan) extends UnaryExecNode { var commitProgress: Option[StreamWriterCommitProgress] = None @@ -53,13 +53,13 @@ case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: Spark override def output: Seq[Attribute] = Nil override protected def doExecute(): RDD[InternalRow] = { - val writerFactory = writeSupport.createBatchWriterFactory() - val useCommitCoordinator = writeSupport.useCommitCoordinator + val writerFactory = batchWrite.createBatchWriterFactory() + val useCommitCoordinator = batchWrite.useCommitCoordinator val rdd = query.execute() val messages = new Array[WriterCommitMessage](rdd.partitions.length) val totalNumRowsAccumulator = new LongAccumulator() - logInfo(s"Start processing data source write support: $writeSupport. " + + logInfo(s"Start processing data source write support: $batchWrite. " + s"The input RDD has ${messages.length} partitions.") try { @@ -72,26 +72,26 @@ case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: Spark val commitMessage = result.writerCommitMessage messages(index) = commitMessage totalNumRowsAccumulator.add(result.numRows) - writeSupport.onDataWriterCommit(commitMessage) + batchWrite.onDataWriterCommit(commitMessage) } ) - logInfo(s"Data source write support $writeSupport is committing.") - writeSupport.commit(messages) - logInfo(s"Data source write support $writeSupport committed.") + logInfo(s"Data source write support $batchWrite is committing.") + batchWrite.commit(messages) + logInfo(s"Data source write support $batchWrite committed.") commitProgress = Some(StreamWriterCommitProgress(totalNumRowsAccumulator.value)) } catch { case cause: Throwable => - logError(s"Data source write support $writeSupport is aborting.") + logError(s"Data source write support $batchWrite is aborting.") try { - writeSupport.abort(messages) + batchWrite.abort(messages) } catch { case t: Throwable => - logError(s"Data source write support $writeSupport failed to abort.") + logError(s"Data source write support $batchWrite failed to abort.") cause.addSuppressed(t) throw new SparkException("Writing job failed.", cause) } - logError(s"Data source write support $writeSupport aborted.") + logError(s"Data source write support $batchWrite aborted.") cause match { // Only wrap non fatal exceptions. case NonFatal(e) => throw new SparkException("Writing job aborted.", e) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 38ecb0dd12daa..db1bf32a156c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2, WriteToDataSourceV2Exec} -import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWritSupport, RateControlMicroBatchReadSupport} +import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWrite, RateControlMicroBatchReadSupport} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset => OffsetV2} @@ -515,7 +515,7 @@ class MicroBatchExecution( newAttributePlan.schema, outputMode, new DataSourceOptions(extraOptions.asJava)) - WriteToDataSourceV2(new MicroBatchWritSupport(currentBatchId, writer), newAttributePlan) + WriteToDataSourceV2(new MicroBatchWrite(currentBatchId, writer), newAttributePlan) case _ => throw new IllegalArgumentException(s"unknown sink type for $sink") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWritSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala similarity index 84% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWritSupport.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala index 9f88416871f8e..143235efee81d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWritSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala @@ -18,16 +18,15 @@ package org.apache.spark.sql.execution.streaming.sources import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.sources.v2.writer.{BatchWriteSupport, DataWriter, DataWriterFactory, WriterCommitMessage} +import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriter, DataWriterFactory, WriterCommitMessage} import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport} /** - * A [[BatchWriteSupport]] used to hook V2 stream writers into a microbatch plan. It implements + * A [[BatchWrite]] used to hook V2 stream writers into a microbatch plan. It implements * the non-streaming interface, forwarding the epoch ID determined at construction to a wrapped * streaming write support. */ -class MicroBatchWritSupport(eppchId: Long, val writeSupport: StreamingWriteSupport) - extends BatchWriteSupport { +class MicroBatchWrite(eppchId: Long, val writeSupport: StreamingWriteSupport) extends BatchWrite { override def commit(messages: Array[WriterCommitMessage]): Unit = { writeSupport.commit(eppchId, messages) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala index ac3c71cc222b1..fd4cb444ce580 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala @@ -21,12 +21,12 @@ import scala.collection.mutable import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.sources.v2.writer.{BatchWriteSupport, DataWriter, DataWriterFactory, WriterCommitMessage} +import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriter, DataWriterFactory, WriterCommitMessage} import org.apache.spark.sql.sources.v2.writer.streaming.StreamingDataWriterFactory /** * A simple [[DataWriterFactory]] whose tasks just pack rows into the commit message for delivery - * to a [[BatchWriteSupport]] on the driver. + * to a [[BatchWrite]] on the driver. * * Note that, because it sends all rows to the driver, this factory will generally be unsuitable * for production-quality sinks. It's intended for use in tests. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala index d282193d35d76..c60ea4a2f9f5e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -329,8 +329,8 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { .format(classOf[DataSourceV2WithSessionConfig].getName).load() val options = df.queryExecution.optimizedPlan.collectFirst { case d: DataSourceV2Relation => d.options - } - assert(options.get.get(optionName) == Some("false")) + }.get + assert(options.get(optionName).get == "false") } } @@ -356,13 +356,11 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { val cls = classOf[SimpleWriteOnlyDataSource] val path = file.getCanonicalPath val df = spark.range(5).select('id as 'i, -'id as 'j) - try { - df.write.format(cls.getName).option("path", path).mode("error").save() - df.write.format(cls.getName).option("path", path).mode("overwrite").save() - df.write.format(cls.getName).option("path", path).mode("ignore").save() - } catch { - case e: SchemaReadAttemptException => fail("Schema read was attempted.", e) - } + // non-append mode should not throw exception, as they don't access schema. + df.write.format(cls.getName).option("path", path).mode("error").save() + df.write.format(cls.getName).option("path", path).mode("overwrite").save() + df.write.format(cls.getName).option("path", path).mode("ignore").save() + // append mode will access schema and should throw exception. intercept[SchemaReadAttemptException] { df.write.format(cls.getName).option("path", path).mode("append").save() } @@ -680,10 +678,12 @@ object SpecificReaderFactory extends PartitionReaderFactory { class SchemaReadAttemptException(m: String) extends RuntimeException(m) class SimpleWriteOnlyDataSource extends SimpleWritableDataSource { - override def writeSchema(): StructType = { - // This is a bit hacky since this source implements read support but throws - // during schema retrieval. Might have to rewrite but it's done - // such so for minimised changes. - throw new SchemaReadAttemptException("read is not supported") + + override def getTable(options: DataSourceOptions): Table = { + new MyTable(options) { + override def schema(): StructType = { + throw new SchemaReadAttemptException("schema should not be read.") + } + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala index 82bb4fa33a3ae..6e4f2bbcd6b61 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.sources.v2 import java.io.{BufferedReader, InputStreamReader, IOException} -import java.util.Optional import scala.collection.JavaConverters._ @@ -35,15 +34,13 @@ import org.apache.spark.util.SerializableConfiguration /** * A HDFS based transactional writable data source. - * Each task writes data to `target/_temporary/queryId/$jobId-$partitionId-$attemptNumber`. - * Each job moves files from `target/_temporary/queryId/` to `target`. + * Each task writes data to `target/_temporary/uniqueId/$jobId-$partitionId-$attemptNumber`. + * Each job moves files from `target/_temporary/uniqueId/` to `target`. */ class SimpleWritableDataSource extends DataSourceV2 - with TableProvider - with BatchWriteSupportProvider - with SessionConfigSupport { + with TableProvider with SessionConfigSupport { - protected def writeSchema(): StructType = new StructType().add("i", "long").add("j", "long") + private val tableSchema = new StructType().add("i", "long").add("j", "long") override def keyPrefix: String = "simpleWritableDataSource" @@ -68,22 +65,50 @@ class SimpleWritableDataSource extends DataSourceV2 new CSVReaderFactory(serializableConf) } - override def readSchema(): StructType = writeSchema + override def readSchema(): StructType = tableSchema } - override def getTable(options: DataSourceOptions): Table = { - val path = new Path(options.get("path").get()) - val conf = SparkContext.getActive.get.hadoopConfiguration - new SimpleBatchTable { - override def newScanBuilder(options: DataSourceOptions): ScanBuilder = { - new MyScanBuilder(path.toUri.toString, conf) + class MyWriteBuilder(path: String) extends WriteBuilder with SupportsSaveMode { + private var queryId: String = _ + private var mode: SaveMode = _ + + override def withQueryId(queryId: String): WriteBuilder = { + this.queryId = queryId + this + } + + override def mode(mode: SaveMode): WriteBuilder = { + this.mode = mode + this + } + + override def buildForBatch(): BatchWrite = { + assert(mode != null) + + val hadoopPath = new Path(path) + val hadoopConf = SparkContext.getActive.get.hadoopConfiguration + val fs = hadoopPath.getFileSystem(hadoopConf) + + if (mode == SaveMode.ErrorIfExists) { + if (fs.exists(hadoopPath)) { + throw new RuntimeException("data already exists.") + } + } + if (mode == SaveMode.Ignore) { + if (fs.exists(hadoopPath)) { + return null + } + } + if (mode == SaveMode.Overwrite) { + fs.delete(hadoopPath, true) } - override def schema(): StructType = writeSchema + val pathStr = hadoopPath.toUri.toString + new MyBatchWrite(queryId, pathStr, hadoopConf) } } - class WritSupport(queryId: String, path: String, conf: Configuration) extends BatchWriteSupport { + class MyBatchWrite(queryId: String, path: String, conf: Configuration) extends BatchWrite { override def createBatchWriterFactory(): DataWriterFactory = { SimpleCounter.resetCounter new CSVDataWriterFactory(path, queryId, new SerializableConfiguration(conf)) @@ -116,33 +141,23 @@ class SimpleWritableDataSource extends DataSourceV2 } } - override def createBatchWriteSupport( - queryId: String, - schema: StructType, - mode: SaveMode, - options: DataSourceOptions): Optional[BatchWriteSupport] = { - assert(!SparkContext.getActive.get.conf.getBoolean("spark.speculation", false)) + class MyTable(options: DataSourceOptions) extends SimpleBatchTable with SupportsBatchWrite { + private val path = options.get("path").get() + private val conf = SparkContext.getActive.get.hadoopConfiguration - val path = new Path(options.get("path").get()) - val conf = SparkContext.getActive.get.hadoopConfiguration - val fs = path.getFileSystem(conf) + override def schema(): StructType = tableSchema - if (mode == SaveMode.ErrorIfExists) { - if (fs.exists(path)) { - throw new RuntimeException("data already exists.") - } + override def newScanBuilder(options: DataSourceOptions): ScanBuilder = { + new MyScanBuilder(new Path(path).toUri.toString, conf) } - if (mode == SaveMode.Ignore) { - if (fs.exists(path)) { - return Optional.empty() - } - } - if (mode == SaveMode.Overwrite) { - fs.delete(path, true) + + override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = { + new MyWriteBuilder(path) } + } - val pathStr = path.toUri.toString - Optional.of(new WritSupport(queryId, pathStr, conf)) + override def getTable(options: DataSourceOptions): Table = { + new MyTable(options) } }