diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 28c9853bfea9c..b8a10187aa4a8 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -269,8 +269,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister override def createStreamingWriteSupport( queryId: String, - schema: StructType, - mode: OutputMode, options: DataSourceOptions): StreamingWriteSupport = { import scala.collection.JavaConverters._ @@ -279,10 +277,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister // We convert the options argument from V2 -> Java map -> scala mutable -> scala immutable. val producerParams = kafkaParamsForProducer(options.asMap.asScala.toMap) - KafkaWriter.validateQuery( - schema.toAttributes, new java.util.HashMap[String, Object](producerParams.asJava), topic) - - new KafkaStreamingWriteSupport(topic, producerParams, schema) + new KafkaStreamingWriteSupport(topic, producerParams) } private def strategy(caseInsensitiveParams: Map[String, String]) = diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala index dc19312f79a22..26ad61d6b7ae8 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala @@ -21,9 +21,12 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.streaming.sources.StreamWriteConfig import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery +import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.sources.v2.writer._ -import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport} +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteConfig, StreamingWriteSupport} +import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType /** @@ -38,19 +41,31 @@ case object KafkaWriterCommitMessage extends WriterCommitMessage * @param topic The topic this writer is responsible for. If None, topic will be inferred from * a `topic` field in the incoming data. * @param producerParams Parameters for Kafka producers in each task. - * @param schema The schema of the input data. */ -class KafkaStreamingWriteSupport( - topic: Option[String], producerParams: Map[String, String], schema: StructType) +class KafkaStreamingWriteSupport(topic: Option[String], producerParams: Map[String, String]) extends StreamingWriteSupport { - validateQuery(schema.toAttributes, producerParams.toMap[String, Object].asJava, topic) + override def createWriteConfig( + schema: StructType, + mode: OutputMode, + options: DataSourceOptions): StreamingWriteConfig = { + validateQuery(schema.toAttributes, producerParams.toMap[String, Object].asJava, topic) + StreamWriteConfig(schema, mode, options) + } + + override def createStreamingWriterFactory( + config: StreamingWriteConfig): KafkaStreamWriterFactory = + KafkaStreamWriterFactory(topic, producerParams, config.writeSchema) - override def createStreamingWriterFactory(): KafkaStreamWriterFactory = - KafkaStreamWriterFactory(topic, producerParams, schema) + override def commit( + config: StreamingWriteConfig, + epochId: Long, + messages: Array[WriterCommitMessage]): Unit = {} - override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} - override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + override def abort( + config: StreamingWriteConfig, + epochId: Long, + messages: Array[WriterCommitMessage]): Unit = {} } /** diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java index bd10c3353bf12..ee465e80107c6 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java @@ -22,7 +22,6 @@ import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport; -import org.apache.spark.sql.types.StructType; /** * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to @@ -35,25 +34,18 @@ public interface BatchWriteSupportProvider extends DataSourceV2 { /** - * Creates an optional {@link BatchWriteSupport} instance to save the data to this data source, - * which is called by Spark at the beginning of each batch query. + * Creates a {@link BatchWriteSupport} instance to save the data to a data source. Called by + * Spark at the beginning of each batch query. * * Data sources can return None if there is no writing needed to be done according to the save * mode. * - * @param queryId A unique string for the writing query. It's possible that there are many - * writing queries running at the same time, and the returned - * {@link BatchWriteSupport} can use this id to distinguish itself from others. - * @param schema the schema of the data to be written. * @param mode the save mode which determines what to do when the data are already in this data * source, please refer to {@link SaveMode} for more details. * @param options the options for the returned data source writer, which is an immutable * case-insensitive string-to-string map. * @return a write support to write data to this data source. */ - Optional createBatchWriteSupport( - String queryId, - StructType schema, - SaveMode mode, - DataSourceOptions options); + // TODO: remove SaveMode + Optional createBatchWriteSupport(SaveMode mode, DataSourceOptions options); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java index f9ca85d8089b4..dd19d99606b37 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java @@ -20,8 +20,6 @@ import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.execution.streaming.BaseStreamingSink; import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport; -import org.apache.spark.sql.streaming.OutputMode; -import org.apache.spark.sql.types.StructType; /** * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to @@ -40,15 +38,10 @@ public interface StreamingWriteSupportProvider extends DataSourceV2, BaseStreami * @param queryId A unique string for the writing query. It's possible that there are many * writing queries running at the same time, and the returned * {@link StreamingWriteSupport} can use this id to distinguish itself from others. - * @param schema the schema of the data to be written. - * @param mode the output mode which determines what successive epoch output means to this - * sink, please refer to {@link OutputMode} for more details. * @param options the options for the returned data source writer, which is an immutable * case-insensitive string-to-string map. */ StreamingWriteSupport createStreamingWriteSupport( String queryId, - StructType schema, - OutputMode mode, DataSourceOptions options); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchOverwriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchOverwriteSupport.java new file mode 100644 index 0000000000000..a2e8d81b10dea --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchOverwriteSupport.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.sql.catalyst.plans.logical.Filter; +import org.apache.spark.sql.sources.v2.DataSourceOptions; +import org.apache.spark.sql.types.StructType; + +/** + * An interface that adds support to {@link BatchWriteSupport} for a replace data operation that + * replaces a subset of the output table with the output of a write operation. The subset removed is + * determined by a set of filter expressions. + *

+ * Data source implementations can implement this interface in addition to {@link BatchWriteSupport} + * to support idempotent write operations that replace data matched by a set of delete filters with + * the result of the write operation. + *

+ * This is used to build idempotent writes. For example, a query that produces a daily summary + * may be run several times as new data arrives. Each run should replace the output of the last + * run for a particular day in the partitioned output table. Such a job would write using this + * WriteSupport and would pass a filter matching the previous job's output, like + * $"day" === '2018-08-22', to remove that data and commit the replacement data at + * the same time. + */ +public interface BatchOverwriteSupport extends BatchWriteSupport { + /** + * Creates a {@link WriteConfig} for a batch overwrite operation, where the data written replaces + * the data matching the delete filters. + *

+ * Implementations may reject the delete filters if the delete isn't possible without significant + * effort. For example, partitioned data sources may reject deletes that do not filter by + * partition columns because the filter may require rewriting files without deleted records. + * To reject a delete implementations should throw {@link IllegalArgumentException} with a clear + * error message that identifies which expression was rejected. + * + * @param schema schema of the data that will be written + * @param options options to configure the write operation + * @param deleteFilters filters that match data to be replaced by the data written + * @return a new WriteConfig for the replace data (overwrite) operation + * @throws IllegalArgumentException If the delete is rejected due to required effort + */ + // TODO: replace DataSourceOptions with CaseInsensitiveStringMap + WriteConfig createOverwriteConfig(StructType schema, + DataSourceOptions options, + Filter[] deleteFilters); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchPartitionOverwriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchPartitionOverwriteSupport.java new file mode 100644 index 0000000000000..128560d864f0f --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchPartitionOverwriteSupport.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.sql.sources.v2.DataSourceOptions; +import org.apache.spark.sql.types.StructType; + +/** + * An interface that adds support to {@link BatchWriteSupport} for a replace data operation that + * replaces partitions dynamically with the output of a write operation. + *

+ * Data source implementations can implement this interface in addition to {@link BatchWriteSupport} + * to support write operations that replace all partitions in the output table that are present + * in the write's output data. + *

+ * This is used to implement INSERT OVERWRITE ... PARTITIONS. + */ +public interface BatchPartitionOverwriteSupport extends BatchWriteSupport { + /** + * Creates a {@link WriteConfig} for a batch overwrite operation, where the data partitions + * written by the job replace any partitions that already exist in the output table. + * + * @param schema schema of the data that will be written + * @param options options to configure the write operation + * @return a new WriteConfig for the dynamic partition overwrite operation + */ + // TODO: replace DataSourceOptions with CaseInsensitiveStringMap + WriteConfig createDynamicOverwriteConfig(StructType schema, DataSourceOptions options); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWriteSupport.java index 0ec9e05d6a02b..e740183fe2258 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWriteSupport.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWriteSupport.java @@ -18,19 +18,21 @@ package org.apache.spark.sql.sources.v2.writer; import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.DataSourceOptions; +import org.apache.spark.sql.types.StructType; /** * An interface that defines how to write the data to data source for batch processing. * * The writing procedure is: - * 1. Create a writer factory by {@link #createBatchWriterFactory()}, serialize and send it to all - * the partitions of the input data(RDD). + * 1. Create a writer factory by {@link #createBatchWriterFactory(WriteConfig)}, serialize and + * send it to all the partitions of the input data(RDD). * 2. For each partition, create the data writer, and write the data of the partition with this * writer. If all the data are written successfully, call {@link DataWriter#commit()}. If * exception happens during the writing, call {@link DataWriter#abort()}. - * 3. If all writers are successfully committed, call {@link #commit(WriterCommitMessage[])}. If - * some writers are aborted, or the job failed with an unknown reason, call - * {@link #abort(WriterCommitMessage[])}. + * 3. If all writers are successfully committed, call + * {@link #commit(WriteConfig,WriterCommitMessage[])}. If some writers are aborted, or the job + * failed with an unknown reason, call {@link #abort(WriteConfig, WriterCommitMessage[])}. * * While Spark will retry failed writing tasks, Spark won't retry failed writing jobs. Users should * do it manually in their Spark applications if they want to retry. @@ -40,13 +42,23 @@ @InterfaceStability.Evolving public interface BatchWriteSupport { + /** + * Creates a {@link WriteConfig} for a batch write operation. + * + * @param schema schema of the data that will be written + * @param options options to configure the write operation + * @return a new WriteConfig + */ + // TODO: replace DataSourceOptions with CaseInsensitiveStringMap + WriteConfig createWriteConfig(StructType schema, DataSourceOptions options); + /** * Creates a writer factory which will be serialized and sent to executors. * * If this method fails (by throwing an exception), the action will fail and no Spark job will be * submitted. */ - DataWriterFactory createBatchWriterFactory(); + DataWriterFactory createBatchWriterFactory(WriteConfig config); /** * Returns whether Spark should use the commit coordinator to ensure that at most one task for @@ -54,7 +66,7 @@ public interface BatchWriteSupport { * * @return true if commit coordinator should be used, false otherwise. */ - default boolean useCommitCoordinator() { + default boolean useCommitCoordinator(WriteConfig config) { return true; } @@ -62,31 +74,32 @@ default boolean useCommitCoordinator() { * Handles a commit message on receiving from a successful data writer. * * If this method fails (by throwing an exception), this writing job is considered to to have been - * failed, and {@link #abort(WriterCommitMessage[])} would be called. + * failed, and {@link #abort(WriteConfig, WriterCommitMessage[])} would be called. */ - default void onDataWriterCommit(WriterCommitMessage message) {} + default void onDataWriterCommit(WriteConfig config, WriterCommitMessage message) {} /** * Commits this writing job with a list of commit messages. The commit messages are collected from * successful data writers and are produced by {@link DataWriter#commit()}. * * If this method fails (by throwing an exception), this writing job is considered to to have been - * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination - * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it. + * failed, and {@link #abort(WriteConfig, WriterCommitMessage[])} would be called. The state of + * the destination is undefined and @{@link #abort(WriteConfig, WriterCommitMessage[])} may not be + * able to deal with it. * * Note that speculative execution may cause multiple tasks to run for a partition. By default, * Spark uses the commit coordinator to allow at most one task to commit. Implementations can - * disable this behavior by overriding {@link #useCommitCoordinator()}. If disabled, multiple - * tasks may have committed successfully and one successful commit message per task will be - * passed to this commit method. The remaining commit messages are ignored by Spark. + * disable this behavior by overriding {@link #useCommitCoordinator(WriteConfig)}. If disabled, + * multiple tasks may have committed successfully and one successful commit message per task will + * be passed to this commit method. The remaining commit messages are ignored by Spark. */ - void commit(WriterCommitMessage[] messages); + void commit(WriteConfig config, WriterCommitMessage[] messages); /** * Aborts this writing job because some data writers are failed and keep failing when retry, * or the Spark job fails with some unknown reasons, - * or {@link #onDataWriterCommit(WriterCommitMessage)} fails, - * or {@link #commit(WriterCommitMessage[])} fails. + * or {@link #onDataWriterCommit(WriteConfig,WriterCommitMessage)} fails, + * or {@link #commit(WriteConfig,WriterCommitMessage[])} fails. * * If this method fails (by throwing an exception), the underlying data source may require manual * cleanup. @@ -97,5 +110,5 @@ default void onDataWriterCommit(WriterCommitMessage message) {} * driver when the abort is triggered. So this is just a "best effort" for data sources to * clean up the data left by data writers. */ - void abort(WriterCommitMessage[] messages); + void abort(WriteConfig config, WriterCommitMessage[] messages); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java index 5fb067966ee67..1e26e33a83e5b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java @@ -36,11 +36,11 @@ * * If this data writer succeeds(all records are successfully written and {@link #commit()} * succeeds), a {@link WriterCommitMessage} will be sent to the driver side and pass to - * {@link BatchWriteSupport#commit(WriterCommitMessage[])} with commit messages from other data - * writers. If this data writer fails(one record fails to write or {@link #commit()} fails), an - * exception will be sent to the driver side, and Spark may retry this writing task a few times. - * In each retry, {@link DataWriterFactory#createWriter(int, long)} will receive a - * different `taskId`. Spark will call {@link BatchWriteSupport#abort(WriterCommitMessage[])} + * {@link BatchWriteSupport#commit(WriteConfig,WriterCommitMessage[])} with commit messages from + * other data writers. If this data writer fails(one record fails to write or {@link #commit()} + * fails), an exception will be sent to the driver side, and Spark may retry this writing task a few + * times. In each retry, {@link DataWriterFactory#createWriter(int, long)} will receive a different + * `taskId`. Spark will call {@link BatchWriteSupport#abort(WriteConfig,WriterCommitMessage[])} * when the configured number of retries is exhausted. * * Besides the retry mechanism, Spark may launch speculative tasks if the existing writing task @@ -71,12 +71,12 @@ public interface DataWriter { /** * Commits this writer after all records are written successfully, returns a commit message which * will be sent back to driver side and passed to - * {@link BatchWriteSupport#commit(WriterCommitMessage[])}. + * {@link BatchWriteSupport#commit(WriteConfig, WriterCommitMessage[])}. * * The written data should only be visible to data source readers after - * {@link BatchWriteSupport#commit(WriterCommitMessage[])} succeeds, which means this method - * should still "hide" the written data and ask the {@link BatchWriteSupport} at driver side to - * do the final commit via {@link WriterCommitMessage}. + * {@link BatchWriteSupport#commit(WriteConfig, WriterCommitMessage[])} succeeds, which means this + * method should still "hide" the written data and ask the {@link BatchWriteSupport} at driver + * side to do the final commit via {@link WriterCommitMessage}. * * If this method fails (by throwing an exception), {@link #abort()} will be called and this * data writer is considered to have been failed. @@ -93,8 +93,8 @@ public interface DataWriter { * failed. * * If this method fails(by throwing an exception), the underlying data source may have garbage - * that need to be cleaned by {@link BatchWriteSupport#abort(WriterCommitMessage[])} or manually, - * but these garbage should not be visible to data source readers. + * that need to be cleaned by {@link BatchWriteSupport#abort(WriteConfig, WriterCommitMessage[])} + * or manually, but these garbage should not be visible to data source readers. * * @throws IOException if failure happens during disk/network IO like writing files. */ diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java index 19a36dd232456..8abd6e7fbaee1 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java @@ -24,7 +24,8 @@ import org.apache.spark.sql.catalyst.InternalRow; /** - * A factory of {@link DataWriter} returned by {@link BatchWriteSupport#createBatchWriterFactory()}, + * A factory of {@link DataWriter} returned by + * {@link BatchWriteSupport#createBatchWriterFactory(WriteConfig)}, * which is responsible for creating and initializing the actual data writer at executor side. * * Note that, the writer factory will be serialized and sent to executors, then the data writer diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteConfig.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteConfig.java new file mode 100644 index 0000000000000..5724aee5812f8 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteConfig.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.catalyst.plans.logical.Filter; +import org.apache.spark.sql.sources.v2.DataSourceOptions; +import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport; +import org.apache.spark.sql.streaming.OutputMode; +import org.apache.spark.sql.types.StructType; + +/** + * WriteConfig carries configuration specific to a write operation for a table. + *

+ * WriteConfig is created when Spark initializes a v2 write operation using factory methods provided + * by the write support instance for the query type: + *

+ *

+ * This class is passed to create a {@link DataWriterFactory} and is passed to commit and abort. + */ +@InterfaceStability.Evolving +public interface WriteConfig { + /** + * The schema of rows that will be written. + */ + StructType writeSchema(); + + /** + * Options to configure the write. + */ + DataSourceOptions writeOptions(); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java index 123335c414e9f..d1c337faea1d4 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java @@ -19,13 +19,14 @@ import java.io.Serializable; +import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteConfig; import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport; import org.apache.spark.annotation.InterfaceStability; /** * A commit message returned by {@link DataWriter#commit()} and will be sent back to the driver side - * as the input parameter of {@link BatchWriteSupport#commit(WriterCommitMessage[])} or - * {@link StreamingWriteSupport#commit(long, WriterCommitMessage[])}. + * as the input parameter of {@link BatchWriteSupport#commit(WriteConfig,WriterCommitMessage[])} or + * {@link StreamingWriteSupport#commit(StreamingWriteConfig, long, WriterCommitMessage[])}. * * This is an empty interface, data sources should define their own message class and use it when * generating messages at executor side and handling the messages at driver side. diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java index a4da24fc5ae68..39c802fd82788 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java @@ -26,8 +26,8 @@ /** * A factory of {@link DataWriter} returned by - * {@link StreamingWriteSupport#createStreamingWriterFactory()}, which is responsible for creating - * and initializing the actual data writer at executor side. + * {@link StreamingWriteSupport#createStreamingWriterFactory(StreamingWriteConfig)}, which is + * responsible for creating and initializing the actual data writer at executor side. * * Note that, the writer factory will be serialized and sent to executors, then the data writer * will be created on executors and do the actual writing. So this interface must be diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWriteConfig.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWriteConfig.java new file mode 100644 index 0000000000000..f4709298796a7 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWriteConfig.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer.streaming; + +import org.apache.spark.sql.sources.v2.writer.WriteConfig; +import org.apache.spark.sql.streaming.OutputMode; + +/** + * A {@link WriteConfig} for streaming writes. + */ +public interface StreamingWriteConfig extends WriteConfig { + /** + * Returns the streaming output mode for this write. + */ + OutputMode outputMode(); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWriteSupport.java index 3fdfac5e1c84a..6b849b8b9a122 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWriteSupport.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWriteSupport.java @@ -18,8 +18,11 @@ package org.apache.spark.sql.sources.v2.writer.streaming; import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.writer.DataWriter; import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; +import org.apache.spark.sql.streaming.OutputMode; +import org.apache.spark.sql.types.StructType; /** * An interface that defines how to write the data to data source for streaming processing. @@ -30,13 +33,25 @@ @InterfaceStability.Evolving public interface StreamingWriteSupport { + /** + * Creates a {@link StreamingWriteConfig} for a streaming write operation. + * + * @param schema schema of the data that will be written + * @param mode output mode for the streaming write + * @param options options to configure the write operation + * @return a new WriteConfig + */ + // TODO: replace DataSourceOptions with CaseInsensitiveStringMap + StreamingWriteConfig createWriteConfig( + StructType schema, OutputMode mode, DataSourceOptions options); + /** * Creates a writer factory which will be serialized and sent to executors. * * If this method fails (by throwing an exception), the action will fail and no Spark job will be * submitted. */ - StreamingDataWriterFactory createStreamingWriterFactory(); + StreamingDataWriterFactory createStreamingWriterFactory(StreamingWriteConfig config); /** * Commits this writing job for the specified epoch with a list of commit messages. The commit @@ -45,18 +60,18 @@ public interface StreamingWriteSupport { * * If this method fails (by throwing an exception), this writing job is considered to have been * failed, and the execution engine will attempt to call - * {@link #abort(long, WriterCommitMessage[])}. + * {@link #abort(StreamingWriteConfig, long, WriterCommitMessage[])}. * * The execution engine may call `commit` multiple times for the same epoch in some circumstances. * To support exactly-once data semantics, implementations must ensure that multiple commits for * the same epoch are idempotent. */ - void commit(long epochId, WriterCommitMessage[] messages); + void commit(StreamingWriteConfig config, long epochId, WriterCommitMessage[] messages); /** * Aborts this writing job because some data writers are failed and keep failing when retried, or - * the Spark job fails with some unknown reasons, or {@link #commit(long, WriterCommitMessage[])} - * fails. + * the Spark job fails with some unknown reasons, or + * {@link #commit(StreamingWriteConfig, long, WriterCommitMessage[])} fails. * * If this method fails (by throwing an exception), the underlying data source may require manual * cleanup. @@ -67,5 +82,5 @@ public interface StreamingWriteSupport { * driver when the abort is triggered. So this is just a "best effort" for data sources to * clean up the data left by data writers. */ - void abort(long epochId, WriterCommitMessage[] messages); + void abort(StreamingWriteConfig config, long epochId, WriterCommitMessage[] messages); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index eca2d5b971905..c48dec63771d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql -import java.util.{Locale, Properties, UUID} +import java.util.{Locale, Properties} import scala.collection.JavaConverters._ @@ -245,21 +245,21 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { DataSourceV2Utils.extractSessionConfigs(source, df.sparkSession.sessionState.conf) val relation = DataSourceV2Relation.create(source, options.toMap) + if (mode == SaveMode.Append) { runCommand(df.sparkSession, "save") { AppendData.byName(relation, df.logicalPlan) } } else { - val writer = provider.createBatchWriteSupport( - UUID.randomUUID().toString, - df.logicalPlan.output.toStructType, - mode, - new DataSourceOptions(options.asJava)) + val v2Options = new DataSourceOptions(options.asJava) + val maybeWriter = provider.createBatchWriteSupport(mode, v2Options) - if (writer.isPresent) { + if (maybeWriter.isPresent) { + val writer = maybeWriter.get + val writeConfig = writer.createWriteConfig(df.schema, v2Options) runCommand(df.sparkSession, "save") { - WriteToDataSourceV2(writer.get, df.logicalPlan) + WriteToDataSourceV2(writer, writeConfig, df.logicalPlan) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchWriteConfig.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchWriteConfig.scala new file mode 100644 index 0000000000000..a6831cb677755 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchWriteConfig.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.writer.WriteConfig +import org.apache.spark.sql.types.StructType + +/** + * A generic [[WriteConfig]] implementation for batch writer implementations. + */ +case class BatchWriteConfig(writeSchema: StructType, writeOptions: DataSourceOptions) + extends WriteConfig diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index f7e29593a6353..7e465c211b2fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.datasources.v2 -import java.util.UUID - import scala.collection.JavaConverters._ import org.apache.spark.sql.{AnalysisException, SaveMode} @@ -58,7 +56,7 @@ case class DataSourceV2Relation( override def simpleString: String = "RelationV2 " + metadataString - def newWriteSupport(): BatchWriteSupport = source.createWriteSupport(options, schema) + def newWriteSupport(): BatchWriteSupport = source.createWriteSupport(options) override def computeStats(): Statistics = readSupport match { case r: SupportsReportStatistics => @@ -158,12 +156,8 @@ object DataSourceV2Relation { } } - def createWriteSupport( - options: Map[String, String], - schema: StructType): BatchWriteSupport = { + def createWriteSupport(options: Map[String, String]): BatchWriteSupport = { asWriteSupportProvider.createBatchWriteSupport( - UUID.randomUUID().toString, - schema, SaveMode.Append, new DataSourceOptions(options.asJava)).get } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index fe713ff6c7850..5f782505eccd7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources.v2 +import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.spark.sql.{sources, Strategy} @@ -26,6 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, Rep import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec} +import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport @@ -143,14 +145,17 @@ object DataSourceV2Strategy extends Strategy { DataSourceV2ScanExec( r.output, r.source, r.options, r.pushedFilters, r.readSupport, scanConfig)) :: Nil - case WriteToDataSourceV2(writer, query) => - WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil + case WriteToDataSourceV2(writer, writeConfig, query) => + WriteToDataSourceV2Exec(writer, writeConfig, planLater(query)) :: Nil case AppendData(r: DataSourceV2Relation, query, _) => - WriteToDataSourceV2Exec(r.newWriteSupport(), planLater(query)) :: Nil + val writeSupport = r.newWriteSupport() + val writeConfig = writeSupport.createWriteConfig( + query.schema, new DataSourceOptions(r.options.asJava)) + WriteToDataSourceV2Exec(r.newWriteSupport(), writeConfig, planLater(query)) :: Nil - case WriteToContinuousDataSource(writer, query) => - WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil + case WriteToContinuousDataSource(writer, config, query) => + WriteToContinuousDataSourceExec(writer, config, planLater(query)) :: Nil case Repartition(1, false, child) => val isContinuous = child.find { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index c3f7b690ef636..1c36dfe451a61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -35,7 +35,10 @@ import org.apache.spark.util.Utils * specific logical plans, like [[org.apache.spark.sql.catalyst.plans.logical.AppendData]]. */ @deprecated("Use specific logical plans like AppendData instead", "2.4.0") -case class WriteToDataSourceV2(writeSupport: BatchWriteSupport, query: LogicalPlan) +case class WriteToDataSourceV2( + writeSupport: BatchWriteSupport, + writeConfig: WriteConfig, + query: LogicalPlan) extends LogicalPlan { override def children: Seq[LogicalPlan] = Seq(query) override def output: Seq[Attribute] = Nil @@ -44,15 +47,18 @@ case class WriteToDataSourceV2(writeSupport: BatchWriteSupport, query: LogicalPl /** * The physical plan for writing data into data source v2. */ -case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: SparkPlan) +case class WriteToDataSourceV2Exec( + writeSupport: BatchWriteSupport, + writeConfig: WriteConfig, + query: SparkPlan) extends SparkPlan { override def children: Seq[SparkPlan] = Seq(query) override def output: Seq[Attribute] = Nil override protected def doExecute(): RDD[InternalRow] = { - val writerFactory = writeSupport.createBatchWriterFactory() - val useCommitCoordinator = writeSupport.useCommitCoordinator + val writerFactory = writeSupport.createBatchWriterFactory(writeConfig) + val useCommitCoordinator = writeSupport.useCommitCoordinator(writeConfig) val rdd = query.execute() val messages = new Array[WriterCommitMessage](rdd.partitions.length) @@ -67,18 +73,18 @@ case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: Spark rdd.partitions.indices, (index, message: WriterCommitMessage) => { messages(index) = message - writeSupport.onDataWriterCommit(message) + writeSupport.onDataWriterCommit(writeConfig, message) } ) logInfo(s"Data source write support $writeSupport is committing.") - writeSupport.commit(messages) + writeSupport.commit(writeConfig, messages) logInfo(s"Data source write support $writeSupport committed.") } catch { case cause: Throwable => logError(s"Data source write support $writeSupport is aborting.") try { - writeSupport.abort(messages) + writeSupport.abort(writeConfig, messages) } catch { case t: Throwable => logError(s"Data source write support $writeSupport failed to abort.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index cf83ba7436d17..de60b15378844 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentBatchTimestamp, import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2} -import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWritSupport, RateControlMicroBatchReadSupport} +import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWriteSupport, RateControlMicroBatchReadSupport} import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset => OffsetV2} import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} @@ -496,12 +496,11 @@ class MicroBatchExecution( val triggerLogicalPlan = sink match { case _: Sink => newAttributePlan case s: StreamingWriteSupportProvider => - val writer = s.createStreamingWriteSupport( - s"$runId", - newAttributePlan.schema, - outputMode, - new DataSourceOptions(extraOptions.asJava)) - WriteToDataSourceV2(new MicroBatchWritSupport(currentBatchId, writer), newAttributePlan) + val options = new DataSourceOptions(extraOptions.asJava) + val writer = s.createStreamingWriteSupport(s"$runId", options) + val writeConfig = writer.createWriteConfig(newAttributePlan.schema, outputMode, options) + WriteToDataSourceV2( + new MicroBatchWriteSupport(currentBatchId, writer), writeConfig, newAttributePlan) case _ => throw new IllegalArgumentException(s"unknown sink type for $sink") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 417b6b39366ae..506145c19cc79 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalP import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, WriteToDataSourceV2Exec} -import org.apache.spark.sql.execution.streaming.sources.MicroBatchWritSupport +import org.apache.spark.sql.execution.streaming.sources.MicroBatchWriteSupport import org.apache.spark.sql.sources.v2.CustomMetrics import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, SupportsCustomReaderMetrics} import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWriteSupport, SupportsCustomWriterMetrics} @@ -242,7 +242,7 @@ trait ProgressReporter extends Logging { case p if p.isInstanceOf[WriteToDataSourceV2Exec] => p.asInstanceOf[WriteToDataSourceV2Exec].writeSupport }.headOption match { - case Some(w: MicroBatchWritSupport) => Some(w.writeSupport) + case Some(w: MicroBatchWriteSupport) => Some(w.writeSupport) case _ => None } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala index 9c5c16f4f5d13..ab25e48ae3a57 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala @@ -22,7 +22,6 @@ import org.apache.spark.sql.execution.streaming.sources.ConsoleWriteSupport import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister} import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, StreamingWriteSupportProvider} import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport -import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType case class ConsoleRelation(override val sqlContext: SQLContext, data: DataFrame) @@ -37,10 +36,8 @@ class ConsoleSinkProvider extends DataSourceV2 override def createStreamingWriteSupport( queryId: String, - schema: StructType, - mode: OutputMode, options: DataSourceOptions): StreamingWriteSupport = { - new ConsoleWriteSupport(schema, options) + new ConsoleWriteSupport(options) } def createRelation( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 4ddebb33b79d1..67e27f922c1ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -185,12 +185,10 @@ class ContinuousExecution( "CurrentTimestamp and CurrentDate not yet supported for continuous processing") } - val writer = sink.createStreamingWriteSupport( - s"$runId", - triggerLogicalPlan.schema, - outputMode, - new DataSourceOptions(extraOptions.asJava)) - val withSink = WriteToContinuousDataSource(writer, triggerLogicalPlan) + val options = new DataSourceOptions(extraOptions.asJava) + val writer = sink.createStreamingWriteSupport(s"$runId", options) + val writeConfig = writer.createWriteConfig(triggerLogicalPlan.schema, outputMode, options) + val withSink = WriteToContinuousDataSource(writer, writeConfig, triggerLogicalPlan) reportTimeTaken("queryPlanning") { lastExecution = new IncrementalExecution( @@ -224,7 +222,8 @@ class ContinuousExecution( // Use the parent Spark session for the endpoint since it's where this query ID is registered. val epochEndpoint = EpochCoordinatorRef.create( - writer, readSupport, this, epochCoordinatorId, currentBatchId, sparkSession, SparkEnv.get) + writer, writeConfig, readSupport, this, epochCoordinatorId, currentBatchId, sparkSession, + SparkEnv.get) val epochUpdateThread = new Thread(new Runnable { override def run: Unit = { try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala index 2238ce26e7b46..6d1f1ab8de92d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala @@ -25,7 +25,7 @@ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeR import org.apache.spark.sql.SparkSession import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, PartitionOffset} import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage -import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWriteConfig, StreamingWriteSupport} import org.apache.spark.util.RpcUtils private[continuous] sealed trait EpochCoordinatorMessage extends Serializable @@ -83,6 +83,7 @@ private[sql] object EpochCoordinatorRef extends Logging { */ def create( writeSupport: StreamingWriteSupport, + writeConfig: StreamingWriteConfig, readSupport: ContinuousReadSupport, query: ContinuousExecution, epochCoordinatorId: String, @@ -90,7 +91,7 @@ private[sql] object EpochCoordinatorRef extends Logging { session: SparkSession, env: SparkEnv): RpcEndpointRef = synchronized { val coordinator = new EpochCoordinator( - writeSupport, readSupport, query, startEpoch, session, env.rpcEnv) + writeSupport, writeConfig, readSupport, query, startEpoch, session, env.rpcEnv) val ref = env.rpcEnv.setupEndpoint(endpointName(epochCoordinatorId), coordinator) logInfo("Registered EpochCoordinator endpoint") ref @@ -116,6 +117,7 @@ private[sql] object EpochCoordinatorRef extends Logging { */ private[continuous] class EpochCoordinator( writeSupport: StreamingWriteSupport, + writeConfig: StreamingWriteConfig, readSupport: ContinuousReadSupport, query: ContinuousExecution, startEpoch: Long, @@ -198,7 +200,7 @@ private[continuous] class EpochCoordinator( s"and is ready to be committed. Committing epoch $epoch.") // Sequencing is important here. We must commit to the writer before recording the commit // in the query, or we will end up dropping the commit if we restart in the middle. - writeSupport.commit(epoch, messages.toArray) + writeSupport.commit(writeConfig, epoch, messages.toArray) query.commit(epoch) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSource.scala index 7ad21cc304e7c..6e89feb875689 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSource.scala @@ -19,13 +19,15 @@ package org.apache.spark.sql.execution.streaming.continuous import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWriteConfig, StreamingWriteSupport} /** * The logical plan for writing data in a continuous stream. */ case class WriteToContinuousDataSource( - writeSupport: StreamingWriteSupport, query: LogicalPlan) extends LogicalPlan { + writeSupport: StreamingWriteSupport, + config: StreamingWriteConfig, + query: LogicalPlan) extends LogicalPlan { override def children: Seq[LogicalPlan] = Seq(query) override def output: Seq[Attribute] = Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala index c216b61383856..dabfe729336b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala @@ -26,18 +26,21 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.streaming.StreamExecution -import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWriteConfig, StreamingWriteSupport} /** * The physical plan for writing data into a continuous processing [[StreamingWriteSupport]]. */ -case class WriteToContinuousDataSourceExec(writeSupport: StreamingWriteSupport, query: SparkPlan) +case class WriteToContinuousDataSourceExec( + writeSupport: StreamingWriteSupport, + writeConfig: StreamingWriteConfig, + query: SparkPlan) extends SparkPlan with Logging { override def children: Seq[SparkPlan] = Seq(query) override def output: Seq[Attribute] = Nil override protected def doExecute(): RDD[InternalRow] = { - val writerFactory = writeSupport.createStreamingWriterFactory() + val writerFactory = writeSupport.createStreamingWriterFactory(writeConfig) val rdd = new ContinuousWriteRDD(query.execute(), writerFactory) logInfo(s"Start processing data source write support: $writeSupport. " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriteSupport.scala index 833e62f35ede1..95670a1ed5ca9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriteSupport.scala @@ -22,13 +22,18 @@ import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage -import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport} +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteConfig, StreamingWriteSupport} +import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType /** Common methods used to create writes for the the console sink */ -class ConsoleWriteSupport(schema: StructType, options: DataSourceOptions) +class ConsoleWriteSupport(options: DataSourceOptions) extends StreamingWriteSupport with Logging { + override def createWriteConfig(schema: StructType, + mode: OutputMode, + options: DataSourceOptions): StreamingWriteConfig = StreamWriteConfig(schema, mode, options) + // Number of rows to display, by default 20 rows protected val numRowsToShow = options.getInt("numRows", 20) @@ -38,15 +43,22 @@ class ConsoleWriteSupport(schema: StructType, options: DataSourceOptions) assert(SparkSession.getActiveSession.isDefined) protected val spark = SparkSession.getActiveSession.get - def createStreamingWriterFactory(): StreamingDataWriterFactory = PackedRowWriterFactory + override def createStreamingWriterFactory( + config: StreamingWriteConfig): StreamingDataWriterFactory = PackedRowWriterFactory - override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = { + override def commit( + config: StreamingWriteConfig, + epochId: Long, + messages: Array[WriterCommitMessage]): Unit = { // We have to print a "Batch" label for the epoch for compatibility with the pre-data source V2 // behavior. - printRows(messages, schema, s"Batch: $epochId") + printRows(messages, config.writeSchema, s"Batch: $epochId") } - def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + def abort( + config: StreamingWriteConfig, + epochId: Long, + messages: Array[WriterCommitMessage]): Unit = {} protected def printRows( commitMessages: Array[WriterCommitMessage], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriteSupportProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriteSupportProvider.scala index 4218fd51ad206..6ad7e16ab1efc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriteSupportProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriteSupportProvider.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.python.PythonForeachWriter import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamingWriteSupportProvider} import org.apache.spark.sql.sources.v2.writer.{DataWriter, WriterCommitMessage} -import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport} +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteConfig, StreamingWriteSupport} import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType @@ -44,18 +44,30 @@ case class ForeachWriteSupportProvider[T]( override def createStreamingWriteSupport( queryId: String, - schema: StructType, - mode: OutputMode, options: DataSourceOptions): StreamingWriteSupport = { new StreamingWriteSupport { - override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} - override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + override def createWriteConfig( + schema: StructType, + mode: OutputMode, + options: DataSourceOptions): StreamingWriteConfig = + StreamWriteConfig(schema, mode, options) - override def createStreamingWriterFactory(): StreamingDataWriterFactory = { + override def commit( + config: StreamingWriteConfig, + epochId: Long, + messages: Array[WriterCommitMessage]): Unit = {} + + override def abort( + config: StreamingWriteConfig, + epochId: Long, + messages: Array[WriterCommitMessage]): Unit = {} + + override def createStreamingWriterFactory( + config: StreamingWriteConfig): StreamingDataWriterFactory = { val rowConverter: InternalRow => T = converter match { case Left(enc) => val boundEnc = enc.resolveAndBind( - schema.toAttributes, + config.writeSchema.toAttributes, SparkSession.getActiveSession.get.sessionState.analyzer) boundEnc.fromRow case Right(func) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWritSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWriteSupport.scala similarity index 54% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWritSupport.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWriteSupport.scala index 9f88416871f8e..b129e95a90010 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWritSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWriteSupport.scala @@ -18,27 +18,38 @@ package org.apache.spark.sql.execution.streaming.sources import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.sources.v2.writer.{BatchWriteSupport, DataWriter, DataWriterFactory, WriterCommitMessage} -import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport} +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.writer.{BatchWriteSupport, DataWriter, DataWriterFactory, WriteConfig, WriterCommitMessage} +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteConfig, StreamingWriteSupport} +import org.apache.spark.sql.types.StructType /** * A [[BatchWriteSupport]] used to hook V2 stream writers into a microbatch plan. It implements * the non-streaming interface, forwarding the epoch ID determined at construction to a wrapped * streaming write support. */ -class MicroBatchWritSupport(eppchId: Long, val writeSupport: StreamingWriteSupport) +class MicroBatchWriteSupport(eppchId: Long, val writeSupport: StreamingWriteSupport) extends BatchWriteSupport { - override def commit(messages: Array[WriterCommitMessage]): Unit = { - writeSupport.commit(eppchId, messages) + override def createWriteConfig( + schema: StructType, + options: DataSourceOptions): WriteConfig = + // write config must be created with the wrapped streaming write support instead of this + // because it will be passed back to the wrapped write support, which requires an output mode. + throw new UnsupportedOperationException( + "WriteConfig for micro-batch must be created by the wrapped StreamingWriteSupport") + + override def commit(config: WriteConfig, messages: Array[WriterCommitMessage]): Unit = { + writeSupport.commit(config.asInstanceOf[StreamingWriteConfig], eppchId, messages) } - override def abort(messages: Array[WriterCommitMessage]): Unit = { - writeSupport.abort(eppchId, messages) + override def abort(config: WriteConfig, messages: Array[WriterCommitMessage]): Unit = { + writeSupport.abort(config.asInstanceOf[StreamingWriteConfig], eppchId, messages) } - override def createBatchWriterFactory(): DataWriterFactory = { - new MicroBatchWriterFactory(eppchId, writeSupport.createStreamingWriterFactory()) + override def createBatchWriterFactory(config: WriteConfig): DataWriterFactory = { + new MicroBatchWriterFactory(eppchId, + writeSupport.createStreamingWriterFactory(config.asInstanceOf[StreamingWriteConfig])) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/StreamWriteConfig.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/StreamWriteConfig.scala new file mode 100644 index 0000000000000..cdc7ef0eaadbe --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/StreamWriteConfig.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteConfig +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType + +/** + * A generic [[StreamingWriteConfig]] implementation for streaming writer implementations. + */ +case class StreamWriteConfig( + writeSchema: StructType, + outputMode: OutputMode, + writeOptions: DataSourceOptions) extends StreamingWriteConfig diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala index 2509450f0da9d..84a4d42923799 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Comp import org.apache.spark.sql.execution.streaming.{MemorySinkBase, Sink} import org.apache.spark.sql.sources.v2.{CustomMetrics, DataSourceOptions, DataSourceV2, StreamingWriteSupportProvider} import org.apache.spark.sql.sources.v2.writer._ -import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport, SupportsCustomWriterMetrics} +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteConfig, StreamingWriteSupport, SupportsCustomWriterMetrics} import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType @@ -50,10 +50,8 @@ class MemorySinkV2 extends DataSourceV2 with StreamingWriteSupportProvider override def createStreamingWriteSupport( queryId: String, - schema: StructType, - mode: OutputMode, options: DataSourceOptions): StreamingWriteSupport = { - new MemoryStreamingWriteSupport(this, mode, schema) + new MemoryStreamingWriteSupport(this) } private case class AddedData(batchId: Long, data: Array[Row]) @@ -134,24 +132,33 @@ class MemoryV2CustomMetrics(sink: MemorySinkV2) extends CustomMetrics { override def json(): String = Serialization.write(Map("numRows" -> sink.numRows)) } -class MemoryStreamingWriteSupport( - val sink: MemorySinkV2, outputMode: OutputMode, schema: StructType) +class MemoryStreamingWriteSupport(val sink: MemorySinkV2) extends StreamingWriteSupport with SupportsCustomWriterMetrics { private val customMemoryV2Metrics = new MemoryV2CustomMetrics(sink) - override def createStreamingWriterFactory: MemoryWriterFactory = { - MemoryWriterFactory(outputMode, schema) + override def createWriteConfig(schema: StructType, + mode: OutputMode, + options: DataSourceOptions): StreamingWriteConfig = StreamWriteConfig(schema, mode, options) + + override def createStreamingWriterFactory(config: StreamingWriteConfig): MemoryWriterFactory = { + MemoryWriterFactory(config.outputMode, config.writeSchema) } - override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = { + override def commit( + config: StreamingWriteConfig, + epochId: Long, + messages: Array[WriterCommitMessage]): Unit = { val newRows = messages.flatMap { case message: MemoryWriterCommitMessage => message.data } - sink.write(epochId, outputMode, newRows) + sink.write(epochId, config.outputMode, newRows) } - override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = { + override def abort( + config: StreamingWriteConfig, + epochId: Long, + messages: Array[WriterCommitMessage]): Unit = { // Don't accept any of the new input. } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala index 50f13bee251ea..c98eac1b379a1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala @@ -22,10 +22,12 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.streaming.sources._ +import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.streaming.{OutputMode, StreamTest} import org.apache.spark.sql.types.StructType class MemorySinkV2Suite extends StreamTest with BeforeAndAfter { + val emptyOptions = new DataSourceOptions(new java.util.HashMap[String, String]()) test("data writer") { val partition = 1234 val writer = new MemoryDataWriter( @@ -43,9 +45,10 @@ class MemorySinkV2Suite extends StreamTest with BeforeAndAfter { test("streaming writer") { val sink = new MemorySinkV2 - val writeSupport = new MemoryStreamingWriteSupport( - sink, OutputMode.Append(), new StructType().add("i", "int")) - writeSupport.commit(0, + val writeSupport = new MemoryStreamingWriteSupport(sink) + val writeConfig = writeSupport.createWriteConfig( + new StructType().add("i", "int"), OutputMode.Append(), emptyOptions) + writeSupport.commit(writeConfig, 0, Array( MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))), MemoryWriterCommitMessage(1, Seq(Row(3), Row(4))), @@ -53,7 +56,7 @@ class MemorySinkV2Suite extends StreamTest with BeforeAndAfter { )) assert(sink.latestBatchId.contains(0)) assert(sink.latestBatchData.map(_.getInt(0)).sorted == Seq(1, 2, 3, 4, 6, 7)) - writeSupport.commit(19, + writeSupport.commit(writeConfig, 19, Array( MemoryWriterCommitMessage(3, Seq(Row(11), Row(22))), MemoryWriterCommitMessage(0, Seq(Row(33))) @@ -67,10 +70,10 @@ class MemorySinkV2Suite extends StreamTest with BeforeAndAfter { test("writer metrics") { val sink = new MemorySinkV2 val schema = new StructType().add("i", "int") - val writeSupport = new MemoryStreamingWriteSupport( - sink, OutputMode.Append(), schema) + val writeSupport = new MemoryStreamingWriteSupport(sink) + val writeConfig = writeSupport.createWriteConfig(schema, OutputMode.Append(), emptyOptions) // batch 0 - writeSupport.commit(0, + writeSupport.commit(writeConfig, 0, Array( MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))), MemoryWriterCommitMessage(1, Seq(Row(3), Row(4))), @@ -78,7 +81,7 @@ class MemorySinkV2Suite extends StreamTest with BeforeAndAfter { )) assert(writeSupport.getCustomMetrics.json() == "{\"numRows\":6}") // batch 1 - writeSupport.commit(1, + writeSupport.commit(writeConfig, 1, Array( MemoryWriterCommitMessage(0, Seq(Row(7), Row(8))) )) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala index 952241b0b6be5..893e7dd3d0903 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.sources.v2 import java.io.{BufferedReader, InputStreamReader, IOException} -import java.util.Optional +import java.util.{Optional, UUID} import scala.collection.JavaConverters._ @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.SparkContext import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.v2.BatchWriteConfig import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.writer._ import org.apache.spark.sql.types.{DataType, StructType} @@ -68,17 +69,21 @@ class SimpleWritableDataSource extends DataSourceV2 } } - class WritSupport(queryId: String, path: String, conf: Configuration) extends BatchWriteSupport { - override def createBatchWriterFactory(): DataWriterFactory = { + class WriteSupport(queryId: String, path: String, conf: Configuration) extends BatchWriteSupport { + override def createWriteConfig( + schema: StructType, + options: DataSourceOptions): WriteConfig = BatchWriteConfig(schema, options) + + override def createBatchWriterFactory(config: WriteConfig): DataWriterFactory = { SimpleCounter.resetCounter new CSVDataWriterFactory(path, queryId, new SerializableConfiguration(conf)) } - override def onDataWriterCommit(message: WriterCommitMessage): Unit = { + override def onDataWriterCommit(config: WriteConfig, message: WriterCommitMessage): Unit = { SimpleCounter.increaseCounter } - override def commit(messages: Array[WriterCommitMessage]): Unit = { + override def commit(config: WriteConfig, messages: Array[WriterCommitMessage]): Unit = { val finalPath = new Path(path) val jobPath = new Path(new Path(finalPath, "_temporary"), queryId) val fs = jobPath.getFileSystem(conf) @@ -94,7 +99,7 @@ class SimpleWritableDataSource extends DataSourceV2 } } - override def abort(messages: Array[WriterCommitMessage]): Unit = { + override def abort(config: WriteConfig, messages: Array[WriterCommitMessage]): Unit = { val jobPath = new Path(new Path(path, "_temporary"), queryId) val fs = jobPath.getFileSystem(conf) fs.delete(jobPath, true) @@ -108,8 +113,6 @@ class SimpleWritableDataSource extends DataSourceV2 } override def createBatchWriteSupport( - queryId: String, - schema: StructType, mode: SaveMode, options: DataSourceOptions): Optional[BatchWriteSupport] = { assert(DataType.equalsStructurally(schema.asNullable, this.schema.asNullable)) @@ -134,7 +137,7 @@ class SimpleWritableDataSource extends DataSourceV2 } val pathStr = path.toUri.toString - Optional.of(new WritSupport(queryId, pathStr, conf)) + Optional.of(new WriteSupport(UUID.randomUUID.toString, pathStr, conf)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala index d6819eacd07ca..65bd853fe74c5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.execution.streaming.continuous._ import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReader, ContinuousReadSupport, PartitionOffset} -import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWriteConfig, StreamingWriteSupport} import org.apache.spark.sql.streaming.StreamTest import org.apache.spark.sql.types.{DataType, IntegerType, StructType} @@ -44,6 +44,7 @@ class ContinuousQueuedDataReaderSuite extends StreamTest with MockitoSugar { super.beforeEach() epochEndpoint = EpochCoordinatorRef.create( mock[StreamingWriteSupport], + mock[StreamingWriteConfig], mock[ContinuousReadSupport], mock[ContinuousExecution], coordinatorId, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala index 3c973d8ebc704..0e48c9f4c9f5f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.LocalSparkSession import org.apache.spark.sql.execution.streaming.continuous._ import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, PartitionOffset} import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage -import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWriteConfig, StreamingWriteSupport} import org.apache.spark.sql.test.TestSparkSession class EpochCoordinatorSuite @@ -46,14 +46,15 @@ class EpochCoordinatorSuite override def beforeEach(): Unit = { val reader = mock[ContinuousReadSupport] + val writeConfig = mock[StreamingWriteConfig] writeSupport = mock[StreamingWriteSupport] query = mock[ContinuousExecution] orderVerifier = inOrder(writeSupport, query) spark = new TestSparkSession() - epochCoordinator - = EpochCoordinatorRef.create(writeSupport, reader, query, "test", 1, spark, SparkEnv.get) + epochCoordinator = EpochCoordinatorRef.create( + writeSupport, writeConfig, reader, query, "test", 1, spark, SparkEnv.get) } test("single epoch") { @@ -209,12 +210,12 @@ class EpochCoordinatorSuite } private def verifyCommit(epoch: Long): Unit = { - orderVerifier.verify(writeSupport).commit(eqTo(epoch), any()) + orderVerifier.verify(writeSupport).commit(any(), eqTo(epoch), any()) orderVerifier.verify(query).commit(epoch) } private def verifyNoCommitFor(epoch: Long): Unit = { - verify(writeSupport, never()).commit(eqTo(epoch), any()) + verify(writeSupport, never()).commit(any(), eqTo(epoch), any()) verify(query, never()).commit(epoch) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala index aeef4c8fe9332..c1f915d92dfd1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala @@ -68,8 +68,6 @@ trait FakeContinuousReadSupportProvider extends ContinuousReadSupportProvider { trait FakeStreamingWriteSupportProvider extends StreamingWriteSupportProvider { override def createStreamingWriteSupport( queryId: String, - schema: StructType, - mode: OutputMode, options: DataSourceOptions): StreamingWriteSupport = { throw new IllegalStateException("fake sink - cannot actually write") }