Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25188][SQL] Add WriteConfig to v2 write API. #22190

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister

override def createStreamingWriteSupport(
queryId: String,
schema: StructType,
mode: OutputMode,
options: DataSourceOptions): StreamingWriteSupport = {
import scala.collection.JavaConverters._

Expand All @@ -279,10 +277,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
// We convert the options argument from V2 -> Java map -> scala mutable -> scala immutable.
val producerParams = kafkaParamsForProducer(options.asMap.asScala.toMap)

KafkaWriter.validateQuery(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This query validation happens in KafkaStreamingWriteSupport. It was duplicated here and in that class. Now, it happens just once when creating the scan config.

schema.toAttributes, new java.util.HashMap[String, Object](producerParams.asJava), topic)

new KafkaStreamingWriteSupport(topic, producerParams, schema)
new KafkaStreamingWriteSupport(topic, producerParams)
}

private def strategy(caseInsensitiveParams: Map[String, String]) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ import scala.collection.JavaConverters._

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.streaming.sources.StreamWriteConfig
import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport}
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteConfig, StreamingWriteSupport}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType

/**
Expand All @@ -38,19 +41,31 @@ case object KafkaWriterCommitMessage extends WriterCommitMessage
* @param topic The topic this writer is responsible for. If None, topic will be inferred from
* a `topic` field in the incoming data.
* @param producerParams Parameters for Kafka producers in each task.
* @param schema The schema of the input data.
*/
class KafkaStreamingWriteSupport(
topic: Option[String], producerParams: Map[String, String], schema: StructType)
class KafkaStreamingWriteSupport(topic: Option[String], producerParams: Map[String, String])
extends StreamingWriteSupport {

validateQuery(schema.toAttributes, producerParams.toMap[String, Object].asJava, topic)
override def createWriteConfig(
schema: StructType,
mode: OutputMode,
options: DataSourceOptions): StreamingWriteConfig = {
validateQuery(schema.toAttributes, producerParams.toMap[String, Object].asJava, topic)
StreamWriteConfig(schema, mode, options)
}

override def createStreamingWriterFactory(
config: StreamingWriteConfig): KafkaStreamWriterFactory =
KafkaStreamWriterFactory(topic, producerParams, config.writeSchema)

override def createStreamingWriterFactory(): KafkaStreamWriterFactory =
KafkaStreamWriterFactory(topic, producerParams, schema)
override def commit(
config: StreamingWriteConfig,
epochId: Long,
messages: Array[WriterCommitMessage]): Unit = {}

override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
override def abort(
config: StreamingWriteConfig,
epochId: Long,
messages: Array[WriterCommitMessage]): Unit = {}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport;
import org.apache.spark.sql.types.StructType;

/**
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
Expand All @@ -35,25 +34,18 @@
public interface BatchWriteSupportProvider extends DataSourceV2 {

/**
* Creates an optional {@link BatchWriteSupport} instance to save the data to this data source,
* which is called by Spark at the beginning of each batch query.
* Creates a {@link BatchWriteSupport} instance to save the data to a data source. Called by
* Spark at the beginning of each batch query.
*
* Data sources can return None if there is no writing needed to be done according to the save
* mode.
*
* @param queryId A unique string for the writing query. It's possible that there are many
* writing queries running at the same time, and the returned
* {@link BatchWriteSupport} can use this id to distinguish itself from others.
* @param schema the schema of the data to be written.
* @param mode the save mode which determines what to do when the data are already in this data
* source, please refer to {@link SaveMode} for more details.
* @param options the options for the returned data source writer, which is an immutable
* case-insensitive string-to-string map.
* @return a write support to write data to this data source.
*/
Optional<BatchWriteSupport> createBatchWriteSupport(
String queryId,
StructType schema,
SaveMode mode,
DataSourceOptions options);
// TODO: remove SaveMode
Optional<BatchWriteSupport> createBatchWriteSupport(SaveMode mode, DataSourceOptions options);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.types.StructType;

/**
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
Expand All @@ -40,15 +38,10 @@ public interface StreamingWriteSupportProvider extends DataSourceV2, BaseStreami
* @param queryId A unique string for the writing query. It's possible that there are many
* writing queries running at the same time, and the returned
* {@link StreamingWriteSupport} can use this id to distinguish itself from others.
* @param schema the schema of the data to be written.
* @param mode the output mode which determines what successive epoch output means to this
* sink, please refer to {@link OutputMode} for more details.
* @param options the options for the returned data source writer, which is an immutable
* case-insensitive string-to-string map.
*/
StreamingWriteSupport createStreamingWriteSupport(
String queryId,
StructType schema,
OutputMode mode,
DataSourceOptions options);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.writer;

import org.apache.spark.sql.catalyst.plans.logical.Filter;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.types.StructType;

/**
* An interface that adds support to {@link BatchWriteSupport} for a replace data operation that
* replaces a subset of the output table with the output of a write operation. The subset removed is
* determined by a set of filter expressions.
* <p>
* Data source implementations can implement this interface in addition to {@link BatchWriteSupport}
* to support idempotent write operations that replace data matched by a set of delete filters with
* the result of the write operation.
* <p>
* This is used to build idempotent writes. For example, a query that produces a daily summary
* may be run several times as new data arrives. Each run should replace the output of the last
* run for a particular day in the partitioned output table. Such a job would write using this
* WriteSupport and would pass a filter matching the previous job's output, like
* <code>$"day" === '2018-08-22'</code>, to remove that data and commit the replacement data at
* the same time.
*/
public interface BatchOverwriteSupport extends BatchWriteSupport {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class will be used to create the WriteConfig for idempotent overwrite operations. This would be triggered by an overwrite like this (the API could be different).

df.writeTo("table").overwrite($"day" === "2018-08-22")

That would produce a OverwriteData(source, deleteFilter, query) logical plan, which would result in the exec node calling this to create the write config.

/**
* Creates a {@link WriteConfig} for a batch overwrite operation, where the data written replaces
* the data matching the delete filters.
* <p>
* Implementations may reject the delete filters if the delete isn't possible without significant
* effort. For example, partitioned data sources may reject deletes that do not filter by
* partition columns because the filter may require rewriting files without deleted records.
* To reject a delete implementations should throw {@link IllegalArgumentException} with a clear
* error message that identifies which expression was rejected.
*
* @param schema schema of the data that will be written
* @param options options to configure the write operation
* @param deleteFilters filters that match data to be replaced by the data written
* @return a new WriteConfig for the replace data (overwrite) operation
* @throws IllegalArgumentException If the delete is rejected due to required effort
*/
// TODO: replace DataSourceOptions with CaseInsensitiveStringMap
WriteConfig createOverwriteConfig(StructType schema,
DataSourceOptions options,
Filter[] deleteFilters);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.writer;

import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.types.StructType;

/**
* An interface that adds support to {@link BatchWriteSupport} for a replace data operation that
* replaces partitions dynamically with the output of a write operation.
* <p>
* Data source implementations can implement this interface in addition to {@link BatchWriteSupport}
* to support write operations that replace all partitions in the output table that are present
* in the write's output data.
* <p>
* This is used to implement INSERT OVERWRITE ... PARTITIONS.
*/
public interface BatchPartitionOverwriteSupport extends BatchWriteSupport {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class will be used to create a WriteConfig that instructs the data source to replace partitions in the existing data with partitions of a dataframe. The logical plan would be DynamicPartitionOverwrite.

/**
* Creates a {@link WriteConfig} for a batch overwrite operation, where the data partitions
* written by the job replace any partitions that already exist in the output table.
*
* @param schema schema of the data that will be written
* @param options options to configure the write operation
* @return a new WriteConfig for the dynamic partition overwrite operation
*/
// TODO: replace DataSourceOptions with CaseInsensitiveStringMap
WriteConfig createDynamicOverwriteConfig(StructType schema, DataSourceOptions options);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,21 @@
package org.apache.spark.sql.sources.v2.writer;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.types.StructType;

/**
* An interface that defines how to write the data to data source for batch processing.
*
* The writing procedure is:
* 1. Create a writer factory by {@link #createBatchWriterFactory()}, serialize and send it to all
* the partitions of the input data(RDD).
* 1. Create a writer factory by {@link #createBatchWriterFactory(WriteConfig)}, serialize and
* send it to all the partitions of the input data(RDD).
* 2. For each partition, create the data writer, and write the data of the partition with this
* writer. If all the data are written successfully, call {@link DataWriter#commit()}. If
* exception happens during the writing, call {@link DataWriter#abort()}.
* 3. If all writers are successfully committed, call {@link #commit(WriterCommitMessage[])}. If
* some writers are aborted, or the job failed with an unknown reason, call
* {@link #abort(WriterCommitMessage[])}.
* 3. If all writers are successfully committed, call
* {@link #commit(WriteConfig,WriterCommitMessage[])}. If some writers are aborted, or the job
* failed with an unknown reason, call {@link #abort(WriteConfig, WriterCommitMessage[])}.
*
* While Spark will retry failed writing tasks, Spark won't retry failed writing jobs. Users should
* do it manually in their Spark applications if they want to retry.
Expand All @@ -40,53 +42,64 @@
@InterfaceStability.Evolving
public interface BatchWriteSupport {

/**
* Creates a {@link WriteConfig} for a batch write operation.
*
* @param schema schema of the data that will be written
* @param options options to configure the write operation
* @return a new WriteConfig
*/
// TODO: replace DataSourceOptions with CaseInsensitiveStringMap
WriteConfig createWriteConfig(StructType schema, DataSourceOptions options);

/**
* Creates a writer factory which will be serialized and sent to executors.
*
* If this method fails (by throwing an exception), the action will fail and no Spark job will be
* submitted.
*/
DataWriterFactory createBatchWriterFactory();
DataWriterFactory createBatchWriterFactory(WriteConfig config);

/**
* Returns whether Spark should use the commit coordinator to ensure that at most one task for
* each partition commits.
*
* @return true if commit coordinator should be used, false otherwise.
*/
default boolean useCommitCoordinator() {
default boolean useCommitCoordinator(WriteConfig config) {
return true;
}

/**
* Handles a commit message on receiving from a successful data writer.
*
* If this method fails (by throwing an exception), this writing job is considered to to have been
* failed, and {@link #abort(WriterCommitMessage[])} would be called.
* failed, and {@link #abort(WriteConfig, WriterCommitMessage[])} would be called.
*/
default void onDataWriterCommit(WriterCommitMessage message) {}
default void onDataWriterCommit(WriteConfig config, WriterCommitMessage message) {}

/**
* Commits this writing job with a list of commit messages. The commit messages are collected from
* successful data writers and are produced by {@link DataWriter#commit()}.
*
* If this method fails (by throwing an exception), this writing job is considered to to have been
* failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination
* is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it.
* failed, and {@link #abort(WriteConfig, WriterCommitMessage[])} would be called. The state of
* the destination is undefined and @{@link #abort(WriteConfig, WriterCommitMessage[])} may not be
* able to deal with it.
*
* Note that speculative execution may cause multiple tasks to run for a partition. By default,
* Spark uses the commit coordinator to allow at most one task to commit. Implementations can
* disable this behavior by overriding {@link #useCommitCoordinator()}. If disabled, multiple
* tasks may have committed successfully and one successful commit message per task will be
* passed to this commit method. The remaining commit messages are ignored by Spark.
* disable this behavior by overriding {@link #useCommitCoordinator(WriteConfig)}. If disabled,
* multiple tasks may have committed successfully and one successful commit message per task will
* be passed to this commit method. The remaining commit messages are ignored by Spark.
*/
void commit(WriterCommitMessage[] messages);
void commit(WriteConfig config, WriterCommitMessage[] messages);

/**
* Aborts this writing job because some data writers are failed and keep failing when retry,
* or the Spark job fails with some unknown reasons,
* or {@link #onDataWriterCommit(WriterCommitMessage)} fails,
* or {@link #commit(WriterCommitMessage[])} fails.
* or {@link #onDataWriterCommit(WriteConfig,WriterCommitMessage)} fails,
* or {@link #commit(WriteConfig,WriterCommitMessage[])} fails.
*
* If this method fails (by throwing an exception), the underlying data source may require manual
* cleanup.
Expand All @@ -97,5 +110,5 @@ default void onDataWriterCommit(WriterCommitMessage message) {}
* driver when the abort is triggered. So this is just a "best effort" for data sources to
* clean up the data left by data writers.
*/
void abort(WriterCommitMessage[] messages);
void abort(WriteConfig config, WriterCommitMessage[] messages);
}
Loading