-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25188][SQL] Add WriteConfig to v2 write API. #22190
Conversation
@@ -279,10 +277,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister | |||
// We convert the options argument from V2 -> Java map -> scala mutable -> scala immutable. | |||
val producerParams = kafkaParamsForProducer(options.asMap.asScala.toMap) | |||
|
|||
KafkaWriter.validateQuery( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This query validation happens in KafkaStreamingWriteSupport
. It was duplicated here and in that class. Now, it happens just once when creating the scan config.
* <code>$"day" === '2018-08-22'</code>, to remove that data and commit the replacement data at | ||
* the same time. | ||
*/ | ||
public interface BatchOverwriteSupport extends BatchWriteSupport { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class will be used to create the WriteConfig
for idempotent overwrite operations. This would be triggered by an overwrite like this (the API could be different).
df.writeTo("table").overwrite($"day" === "2018-08-22")
That would produce a OverwriteData(source, deleteFilter, query)
logical plan, which would result in the exec node calling this to create the write config.
* <p> | ||
* This is used to implement INSERT OVERWRITE ... PARTITIONS. | ||
*/ | ||
public interface BatchPartitionOverwriteSupport { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class will be used to create a WriteConfig
that instructs the data source to replace partitions in the existing data with partitions of a dataframe. The logical plan would be DynamicPartitionOverwrite
.
This comment has been minimized.
This comment has been minimized.
* <p> | ||
* This is used to implement INSERT OVERWRITE ... PARTITIONS. | ||
*/ | ||
public interface BatchPartitionOverwriteSupport extends BatchWriteSupport { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class will be used to create a WriteConfig
that instructs the data source to replace partitions in the existing data with partitions of a dataframe. The logical plan would be DynamicPartitionOverwrite
.
|
||
/** | ||
* A [[BatchWriteSupport]] used to hook V2 stream writers into a microbatch plan. It implements | ||
* the non-streaming interface, forwarding the epoch ID determined at construction to a wrapped | ||
* streaming write support. | ||
*/ | ||
class MicroBatchWritSupport(eppchId: Long, val writeSupport: StreamingWriteSupport) | ||
class MicroBatchWriteSupport(eppchId: Long, val writeSupport: StreamingWriteSupport) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixed a typo in the class name.
This comment has been minimized.
This comment has been minimized.
This is related to #21308, which adds |
Retest this please. |
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
37d5087
to
847300f
Compare
Test build #95174 has finished for PR 22190 at commit
|
@rxin, @cloud-fan, @jose-torres: this is the update to add |
Retest this please |
Test build #95188 has finished for PR 22190 at commit
|
What changes were proposed in this pull request?
This updates the v2 write path to a similar structure as the v2 read path. Individual writes are configured and tracked using
WriteConfig
(analogous toScanConfig
) and this config is passed to the methods ofWriteSupport
that are specific to a single write, likecommit
andabort
.This new config will be used to communicate overwrite options to data sources that implement new support classes,
BatchOverwriteSupport
andBatchPartitionOverwriteSupport
. The new config could also be used by implementations to get and hold locks to make operations atomic.Streaming is also updated to use a
StreamingWriteConfig
. Options that are specific to a write, like schema, output mode, and write options.How was this patch tested?
This is primarily an API change and should pass existing tests.