-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25530][SQL] data source v2 API refactor (batch write) #23208
Conversation
* The major responsibility of this interface is to return a {@link Table} for read/write. If you | ||
* want to allow end-users to write data to non-existing tables via write APIs in `DataFrameWriter` | ||
* with `SaveMode`, you must return a {@link Table} instance even if the table doesn't exist. The | ||
* table schema can be empty in this case. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally only file source(maybe JDBC data source as well) need to do it. For new data sources, I'd expect them to either implement only the new write APIs(replaceWhere, append, etc.), or well define the behavior of SaveMode.Append
so that it fails if table doesn't exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does it mean to write to a non-existing table? If you're writing somewhere, the table must exist.
This is for creating a table directly from configuration and an implementation class in the DataFrameWriter API. The target of the write still needs to exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Exist" is a relative concept, I suppose. I think we need to somehow allow for create-on-write functionality, even if many table providers won't want to support it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jose-torres, create on write is done by CTAS. It should not be left up to the source whether to fail or create.
I think the confusion here is that this is a degenerate case where Spark has no ability to interact with the table's metadata. Spark must assume that it exists because the caller is writing to it.
The caller is indicating that a table exists, is identified by some configuration, and that a specific implementation can be used to write to it. That's what happens today when source implementations are directly specified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it should also be part of the TableProvider
contract that if the table can't be located, it throws an exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can remove SaveMode right away. We don't need to break existing use cases if we add the OverwriteData plan and use it when the user's mode is Overwrite. That helps us get to the point where we can integrate SQL on top of this faster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not convinced it's safe to remove SaveMode
right away, when there is only an Append
operator implemented currently.
If we do it, it means DataFrameWriter.save
need to throw an exception for a lot of cases, except when the mode
is append. I don't think this is acceptable right now.
Can we discuss the removal of SaveMode
at least after all the necessary new write operators are implemented?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SaveMode
is incompatible with the SPIP to standarize behavior that was voted on and accepted. The save mode in DataFrameWriter
must be used to create v2 plans that have well-defined behavior and cannot be passed to implementations in the final version of the v2 read/write API.
I see no reason to put off removing SaveMode
from the API. If we remove it now, we will avoid having more versions of this API that are fundamentally broken. We will avoid more implementations that rely on it, not aware that it will be removed.
To your point about whether it is safe: the only case where this is actually used is SaveMode.Overwrite
and SaveMode.Append
. To replace those, all that needs to happen is to define what kind of overwrite should happen here (dynamic or truncate).
I can supply the logical plan and physical implementation in a follow-up PR because I already have all this written and waiting to go in. Or, I can add a PR to merge first if you'd like to have these changes depend on that implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about the file source behavior difference between SaveMode.Append
and the new append operator? Are you saying we should accept it and ask users to change their code? file source is widely used with df.write.save
API...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest we use the v1 file source as the basis for the behavior for v2. You can see an implementation of that behavior in my other comment. If the table exists, overwrite is a dynamic partition overwrite, append is an append, and ignore does nothing. If the table doesn't exist, then the operation is a CTAS. (Note that we can also check properties to correctly mirror the behavior for static overwrite.)
Your concern is addressed by not using the Append
plan when the file source would have needed to create the table.
The critical difference is that this behavior is all implemented in Spark instead of passing SaveMode
to the source. If you pass SaveMode
to the source, Spark can't guarantee that it is consistent across sources. We are trying to fix inconsistent behavior in v2.
case table: SupportsBatchWrite => | ||
val relation = DataSourceV2Relation.create(table, dsOptions) | ||
// TODO: revisit it. We should not create the `AppendData` operator for `SaveMode.Append`. | ||
// We should create new end-users APIs for the `AppendData` operator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
according to the discussion in #22688 (comment) , the behavior of append operator and SaveMode.Append
can be different. We should revisit it when we have the new end-user write APIs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The example in the referenced comment is this:
spark.range(1).format("source").write.save("non-existent-path")
If a path for a path-based table doesn't exist, then I think that the table doesn't exist. If a table doesn't exist, then the operation for save
should be CTAS instead of AppendData.
Here, I think the right behavior is to check whether the provider returns a table. If it doesn't, then the table doesn't exist and the plan should be CTAS. If it does, then it must provide the schema used to validate the AppendData operation. Since we don't currently have CTAS, this should throw an exception stating that the table doesn't exist and can't be created.
More generally, the meaning of SaveMode with v1 is not always reliable. I think the right approach is what @cloud-fan suggests: create a new write API for v2 tables that is clear for the new logical plans (I've proposed one and would be happy to open a PR). Once the logical plans are in place, we can go back through this API and move it over to v2 where the behaviors match.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is what my branch uses for this logic:
val maybeTable = provider.getTable(identifier)
val exists = maybeTable.isDefined
(exists, mode) match {
case (true, SaveMode.ErrorIfExists) =>
throw new AnalysisException(s"Table already exists: ${identifier.quotedString}")
case (true, SaveMode.Overwrite) =>
val relation = DataSourceV2Relation.create(
catalog.name, identifier, maybeTable.get, options)
runCommand(df.sparkSession, "insertInto") {
OverwritePartitionsDynamic.byName(relation, df.logicalPlan)
}
case (true, SaveMode.Append) =>
val relation = DataSourceV2Relation.create(
catalog.name, identifier, maybeTable.get, options)
runCommand(df.sparkSession, "save") {
AppendData.byName(relation, df.logicalPlan)
}
case (false, _) =>
runCommand(df.sparkSession, "save") {
CreateTableAsSelect(catalog, identifier, Seq.empty, df.logicalPlan, options,
ignoreIfExists = mode == SaveMode.Ignore)
}
case _ =>
// table exists and mode is ignore
}
The identifier handling would be different, but the basic idea is the same.
Also, in our environment we always use dynamic overwrites for the overwrite case. We would need to handle that depending on the environment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, that's why I only left a comment and just ask for revisiting later. I think we can see a clearer picture after we migrating the file source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see no reason to make this API depend on migrating the file source. We know that SaveMode
must be removed. It makes no sense to create a broken file source implementation and then remove this afterward.
output: Seq[AttributeReference], | ||
options: Map[String, String], | ||
userSpecifiedSchema: Option[StructType] = None) | ||
// TODO: use a simple case insensitive map instead. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll do it in my next PR.
Test build #99620 has finished for PR 23208 at commit
|
Thanks for posting this PR @cloud-fan! I'll have a look in the next day or so. |
@@ -25,14 +25,14 @@ | |||
import org.apache.spark.sql.types.StructType; | |||
|
|||
/** | |||
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to | |||
* A mix-in interface for {@link Table}. Data sources can implement this interface to | |||
* provide data writing ability for batch processing. | |||
* | |||
* This interface is used to create {@link BatchWriteSupport} instances when end users run |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a better naming in mind, so I leave it as WriteSupport
for now. Better naming is welcome to match Scan
!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we just call it BatchWrite
* provide data writing ability for batch processing. | ||
* | ||
* This interface is used to create {@link BatchWriteSupport} instances when end users run | ||
* {@code Dataset.write.format(...).option(...).save()}. | ||
*/ | ||
@Evolving | ||
public interface BatchWriteSupportProvider extends DataSourceV2 { | ||
public interface SupportsBatchWrite extends Table { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me, it is quite confusing to have BatchWriteSupport
and SupportsBatchWrite
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's why I left #23208 (comment) .
namings are welcome!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Table
exposes newScanBuilder
without an interface. Why should the write side be different? Doesn't Spark support sources that are read-only and write-only?
I think that both reads and writes should use interfaces to mix support into Table
or both should be exposed by Table
and throw UnsupportedOperationException
by default, not a mix of the two options.
If newWriteBuilder
were added to Table
, then this interface wouldn't be necessary and the name problem goes away.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do think read-only or write-only is a necessary feature, according to what I've seen in the dev list. Maybe we should move newScanBuilder
from Table
to the mixin traits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine either way, as long as we are consistent between the read and write sides.
@cloud-fan, I see that this adds The main parts that we discussed there were:
We don't need to add the overwrite mix-ins here, but I would expect to see a WriteBuilder that returns a Writer. ( The Write would expose BatchWrite and StreamWrite (if they are different) or could directly expose the WriteFactory, commit, abort, etc. WriteBuilder would be extensible so that SupportsOverwrite and SupportsDynamicOverwrite can be added as mix-ins at some point. |
options: Map[String, String], | ||
userSpecifiedSchema: Option[StructType] = None) | ||
// TODO: use a simple case insensitive map instead. | ||
options: DataSourceOptions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change this now, when DataSourceOptions will be replaced? I would say just leave it as a map and update it once later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because this makes the code cleaner, otherwise I need to write more code to convert a map to DataSourceOptions
multiple times inside DataSourceV2Relation
.
I don't have a strong preference here, and just pick the easiest approach for me. If you do think using a map here is clearer, I can add these extra code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A private method to do that existed in the past. Why not just revive it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was done it multiple places before:
https://github.com/apache/spark/pull/23208/files#diff-35ba4ffb5ccb9b18b43226f1d5effa23L62
https://github.com/apache/spark/pull/23208/files#diff-35ba4ffb5ccb9b18b43226f1d5effa23L153
https://github.com/apache/spark/pull/23208/files#diff-35ba4ffb5ccb9b18b43226f1d5effa23L171
If you prefer it strongly, I can follow it and update the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is a good idea to avoid needless churn, so I would prefer using the original Map[String, String].
@rdblue I tried to add
Because of this, I feel we don't need Let me know if you have other ideas. Thanks for your review! |
@cloud-fan, what are you suggesting to use as a design? If you think this shouldn't mirror the read side, then let's be clear on what it should look like. Maybe that's a design doc, or maybe that's a discussion thread on the mailing list. Whatever option we go for, we still need to have a plan for exposing the replace-by-filter and replace-dynamic-partitions methods, whatever they end up being. We also need the life-cycle to match. |
Let's move the high level discussion to the doc: https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?usp=sharing |
def newScanBuilder(): ScanBuilder = { | ||
val dsOptions = new DataSourceOptions(options.asJava) | ||
table.newScanBuilder(dsOptions) | ||
def newWriteSupport(inputSchema: StructType, mode: SaveMode): Optional[BatchWriteSupport] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: add comment for the method. Especially when it will return None. Although it is explained in SupportsBatchWrite.createBatchWriteSupport
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would hold off on this discussion for now. I think this is going to require significant changes.
701000d
to
3bc08cc
Compare
Test build #100101 has started for PR 23208 at commit |
Test build #100090 has finished for PR 23208 at commit
|
Test build #100121 has finished for PR 23208 at commit
|
0372475
to
1c21df5
Compare
Test build #100122 has finished for PR 23208 at commit
|
4d2de39
to
bcea416
Compare
Test build #100148 has finished for PR 23208 at commit
|
Test build #100150 has finished for PR 23208 at commit
|
Test build #100149 has finished for PR 23208 at commit
|
Test build #100151 has finished for PR 23208 at commit
|
bdcb11f
to
6336884
Compare
cd50a85
to
ee7acbc
Compare
Test build #101000 has finished for PR 23208 at commit
|
Test build #101047 has finished for PR 23208 at commit
|
87a5294
to
ec6129a
Compare
Test build #101051 has finished for PR 23208 at commit
|
import org.apache.spark.sql.SaveMode; | ||
|
||
// A temporary mixin trait for `WriteBuilder` to support `SaveMode`. Will be removed before | ||
// Spark 3.0 when all the new write operators are finished. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the blocker issue that tracks this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://issues.apache.org/jira/browse/SPARK-26356
Let me put it in the doc.
|
||
/** | ||
* An interface for building the {@link BatchWrite}. Implementations can mix in interfaces like | ||
* {@link SupportsSaveMode} to support different ways to write data to data sources. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SupportsSaveMode
is essentially deprecated, so documentation should recommend using SupportsOverwrite
or some other mix-in.
public interface WriteBuilder { | ||
|
||
/** | ||
* Returns a new builder with the `queryId`. `queryId` is a unique string of the query. It's |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The important part of this documentation is that it passes Spark's query ID. The fact that this returns a builder for method chaining is secondary, so I would recommend noting that in @return
but not in the main description. Otherwise, it appears like this could be a refinement pattern (returns a new builder) and that isn't the intent.
} | ||
|
||
/** | ||
* Returns a new builder with the schema of the input data to write. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above: this should state that it sets the schema in the builder, not that it returns a new builder.
* | ||
* Note that, the returned {@link BatchWrite} can be null if the implementation supports SaveMode, | ||
* to indicate that no writing is needed. We can clean it up after removing | ||
* {@link SupportsSaveMode}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Returning null for now sounds fine to me.
* {@link SupportsSaveMode} to support different ways to write data to data sources. | ||
*/ | ||
@Evolving | ||
public interface WriteBuilder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this documentation needs to state that unless otherwise modified by a mix-in, the write that is configured by this builder is to append data without affecting existing data.
} | ||
|
||
case _ => throw new AnalysisException( | ||
s"data source ${table.name} does not support SaveMode") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error should be that the source doesn't support the mode, not that it doesn't support SaveMode generally.
@cloud-fan, I think this is looking good overall. The use of |
Hi @rdblue , thanks for the review! It will be great to finish all the write operations soon, and adding overwrite is good as the next step! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recall this is discussed during the latest sync call? I think we are good?
} | ||
} | ||
if (mode == SaveMode.Overwrite) { | ||
fs.delete(hadoopPath, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO semantically this is different from what I expect - overwrite means when all data is there, replace the existing dir.
this is implemented as - remove existing dir, then place the data there
failure mode behavior is different..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because of the ambiguity of SaveMode, I think data source is free to define its own behavior. It's fine this simple data source in the test defines the overwrite behavior in this way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can do such things in the test case to make it simple.
LGTM |
Test build #101234 has finished for PR 23208 at commit
|
retest this please |
Test build #101250 has finished for PR 23208 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Thanks! Merged to master. |
## What changes were proposed in this pull request? Adjust the batch write API to match the read API refactor after apache#23086 The doc with high-level ideas: https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?usp=sharing Basically it renames `BatchWriteSupportProvider` to `SupportsBatchWrite`, and make it extend `Table`. Renames `WriteSupport` to `Write`. It also cleans up some code as batch API is completed. This PR also removes the test from apache#22688 . Now data source must return a table for read/write. A few notes about future changes: 1. We will create `SupportsStreamingWrite` later for streaming APIs 2. We will create `SupportsBatchReplaceWhere`, `SupportsBatchAppend`, etc. for the new end-user write APIs. I think streaming APIs would remain to use `OutputMode`, and new end-user write APIs will apply to batch only, at least in the near future. 3. We will remove `SaveMode` from data source API: https://issues.apache.org/jira/browse/SPARK-26356 ## How was this patch tested? existing tests Closes apache#23208 from cloud-fan/refactor-batch. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: gatorsmile <gatorsmile@gmail.com>
What changes were proposed in this pull request?
Adjust the batch write API to match the read API refactor after #23086
The doc with high-level ideas:
https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?usp=sharing
Basically it renames
BatchWriteSupportProvider
toSupportsBatchWrite
, and make it extendTable
. RenamesWriteSupport
toWrite
. It also cleans up some code as batch API is completed.This PR also removes the test from #22688 . Now data source must return a table for read/write.
A few notes about future changes:
SupportsStreamingWrite
later for streaming APIsSupportsBatchReplaceWhere
,SupportsBatchAppend
, etc. for the new end-user write APIs. I think streaming APIs would remain to useOutputMode
, and new end-user write APIs will apply to batch only, at least in the near future.SaveMode
from data source API: https://issues.apache.org/jira/browse/SPARK-26356How was this patch tested?
existing tests