Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25188][SQL] Add WriteConfig to v2 write API. #22190

Closed
wants to merge 3 commits into from

Conversation

rdblue
Copy link
Contributor

@rdblue rdblue commented Aug 22, 2018

What changes were proposed in this pull request?

This updates the v2 write path to a similar structure as the v2 read path. Individual writes are configured and tracked using WriteConfig (analogous to ScanConfig) and this config is passed to the methods of WriteSupport that are specific to a single write, like commit and abort.

This new config will be used to communicate overwrite options to data sources that implement new support classes, BatchOverwriteSupport and BatchPartitionOverwriteSupport. The new config could also be used by implementations to get and hold locks to make operations atomic.

Streaming is also updated to use a StreamingWriteConfig. Options that are specific to a write, like schema, output mode, and write options.

How was this patch tested?

This is primarily an API change and should pass existing tests.

@rdblue rdblue changed the title SPARK-25188: Add WriteConfig to v2 write API. [SPARK-25188][SQL] Add WriteConfig to v2 write API. Aug 22, 2018
@@ -279,10 +277,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
// We convert the options argument from V2 -> Java map -> scala mutable -> scala immutable.
val producerParams = kafkaParamsForProducer(options.asMap.asScala.toMap)

KafkaWriter.validateQuery(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This query validation happens in KafkaStreamingWriteSupport. It was duplicated here and in that class. Now, it happens just once when creating the scan config.

* <code>$"day" === '2018-08-22'</code>, to remove that data and commit the replacement data at
* the same time.
*/
public interface BatchOverwriteSupport extends BatchWriteSupport {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class will be used to create the WriteConfig for idempotent overwrite operations. This would be triggered by an overwrite like this (the API could be different).

df.writeTo("table").overwrite($"day" === "2018-08-22")

That would produce a OverwriteData(source, deleteFilter, query) logical plan, which would result in the exec node calling this to create the write config.

* <p>
* This is used to implement INSERT OVERWRITE ... PARTITIONS.
*/
public interface BatchPartitionOverwriteSupport {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class will be used to create a WriteConfig that instructs the data source to replace partitions in the existing data with partitions of a dataframe. The logical plan would be DynamicPartitionOverwrite.

@SparkQA

This comment has been minimized.

* <p>
* This is used to implement INSERT OVERWRITE ... PARTITIONS.
*/
public interface BatchPartitionOverwriteSupport extends BatchWriteSupport {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class will be used to create a WriteConfig that instructs the data source to replace partitions in the existing data with partitions of a dataframe. The logical plan would be DynamicPartitionOverwrite.


/**
* A [[BatchWriteSupport]] used to hook V2 stream writers into a microbatch plan. It implements
* the non-streaming interface, forwarding the epoch ID determined at construction to a wrapped
* streaming write support.
*/
class MicroBatchWritSupport(eppchId: Long, val writeSupport: StreamingWriteSupport)
class MicroBatchWriteSupport(eppchId: Long, val writeSupport: StreamingWriteSupport)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixed a typo in the class name.

@SparkQA

This comment has been minimized.

@rdblue
Copy link
Contributor Author

rdblue commented Aug 22, 2018

This is related to #21308, which adds DeleteSupport. Both BatchOverwriteSupport and DeleteSupport use the same input to remove data (Filter[]) and can reject deletes that don't align with partition boundaries.

@rdblue
Copy link
Contributor Author

rdblue commented Aug 23, 2018

Retest this please.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@rdblue rdblue force-pushed the SPARK-25188-add-write-config branch from 37d5087 to 847300f Compare August 23, 2018 17:49
@SparkQA
Copy link

SparkQA commented Aug 23, 2018

Test build #95174 has finished for PR 22190 at commit 847300f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rdblue
Copy link
Contributor Author

rdblue commented Aug 23, 2018

@rxin, @cloud-fan, @jose-torres: this is the update to add WriteConfig. There's one failed test that I think is unrelated, so this is ready for you to have a look. This will probably need to be updated for the current changes under discussion.

@rdblue
Copy link
Contributor Author

rdblue commented Aug 23, 2018

Retest this please

@SparkQA
Copy link

SparkQA commented Aug 24, 2018

Test build #95188 has finished for PR 22190 at commit 847300f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants