-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25188][SQL] Add WriteConfig to v2 write API. #22190
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.sources.v2.writer; | ||
|
||
import org.apache.spark.sql.catalyst.plans.logical.Filter; | ||
import org.apache.spark.sql.sources.v2.DataSourceOptions; | ||
import org.apache.spark.sql.types.StructType; | ||
|
||
/** | ||
* An interface that adds support to {@link BatchWriteSupport} for a replace data operation that | ||
* replaces a subset of the output table with the output of a write operation. The subset removed is | ||
* determined by a set of filter expressions. | ||
* <p> | ||
* Data source implementations can implement this interface in addition to {@link BatchWriteSupport} | ||
* to support idempotent write operations that replace data matched by a set of delete filters with | ||
* the result of the write operation. | ||
* <p> | ||
* This is used to build idempotent writes. For example, a query that produces a daily summary | ||
* may be run several times as new data arrives. Each run should replace the output of the last | ||
* run for a particular day in the partitioned output table. Such a job would write using this | ||
* WriteSupport and would pass a filter matching the previous job's output, like | ||
* <code>$"day" === '2018-08-22'</code>, to remove that data and commit the replacement data at | ||
* the same time. | ||
*/ | ||
public interface BatchOverwriteSupport extends BatchWriteSupport { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This class will be used to create the
That would produce a |
||
/** | ||
* Creates a {@link WriteConfig} for a batch overwrite operation, where the data written replaces | ||
* the data matching the delete filters. | ||
* <p> | ||
* Implementations may reject the delete filters if the delete isn't possible without significant | ||
* effort. For example, partitioned data sources may reject deletes that do not filter by | ||
* partition columns because the filter may require rewriting files without deleted records. | ||
* To reject a delete implementations should throw {@link IllegalArgumentException} with a clear | ||
* error message that identifies which expression was rejected. | ||
* | ||
* @param schema schema of the data that will be written | ||
* @param options options to configure the write operation | ||
* @param deleteFilters filters that match data to be replaced by the data written | ||
* @return a new WriteConfig for the replace data (overwrite) operation | ||
* @throws IllegalArgumentException If the delete is rejected due to required effort | ||
*/ | ||
// TODO: replace DataSourceOptions with CaseInsensitiveStringMap | ||
WriteConfig createOverwriteConfig(StructType schema, | ||
DataSourceOptions options, | ||
Filter[] deleteFilters); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.sources.v2.writer; | ||
|
||
import org.apache.spark.sql.sources.v2.DataSourceOptions; | ||
import org.apache.spark.sql.types.StructType; | ||
|
||
/** | ||
* An interface that adds support to {@link BatchWriteSupport} for a replace data operation that | ||
* replaces partitions dynamically with the output of a write operation. | ||
* <p> | ||
* Data source implementations can implement this interface in addition to {@link BatchWriteSupport} | ||
* to support write operations that replace all partitions in the output table that are present | ||
* in the write's output data. | ||
* <p> | ||
* This is used to implement INSERT OVERWRITE ... PARTITIONS. | ||
*/ | ||
public interface BatchPartitionOverwriteSupport extends BatchWriteSupport { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This class will be used to create a |
||
/** | ||
* Creates a {@link WriteConfig} for a batch overwrite operation, where the data partitions | ||
* written by the job replace any partitions that already exist in the output table. | ||
* | ||
* @param schema schema of the data that will be written | ||
* @param options options to configure the write operation | ||
* @return a new WriteConfig for the dynamic partition overwrite operation | ||
*/ | ||
// TODO: replace DataSourceOptions with CaseInsensitiveStringMap | ||
WriteConfig createDynamicOverwriteConfig(StructType schema, DataSourceOptions options); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This query validation happens in
KafkaStreamingWriteSupport
. It was duplicated here and in that class. Now, it happens just once when creating the scan config.