-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25528][SQL] data source v2 API refactor (batch read) #23086
Conversation
f06b5c5
to
77a2c08
Compare
*/ | ||
// TODO: micro-batch should be handled by `DataSourceV2ScanExec`, after we finish the API refactor | ||
// completely. | ||
case class DataSourceV2StreamingScanExec( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have to use two physical nodes, since batch and streaming have different APIs now.
77a2c08
to
207b0b9
Compare
Test build #99003 has finished for PR 23086 at commit
|
Test build #99004 has finished for PR 23086 at commit
|
Test build #99005 has finished for PR 23086 at commit
|
* topic name, etc. It's an immutable case-insensitive string-to-string map. | ||
* @param schema the user-specified schema. | ||
*/ | ||
default Table getTable(DataSourceOptions options, StructType schema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know that this is from prior DataSourceV2 semantics, but what's the difference between providing the schema
here and the column pruning aspect of ScanBuilder
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically just saying we should just push down this requested schema into the ScanBuilder
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a different thing. Think about you are reading a parquet file, and you know exactly what its physical schema is, and you don't want Spark to waste a job to infer the schema. Then you can specify the schema when reading.
Next, Spark will analyze the query, and figure out what the required schema is. This step is automatic and driven by Spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @cloud-fan. These are slightly different uses.
Here, it is supplying a schema for how to interpret data files. Say you have CSV files with columns id
, ts
, and data
and no headers. This tells the CSV reader what the columns are and how to convert the data to useful types (bigint, timestamp, and string). Column projection will later request those columns, maybe just id
and data
. If you only passed the projection schema, then the ts
values would be returned for the data
column.
* records from the partitions. | ||
*/ | ||
@InterfaceStability.Evolving | ||
public interface Batch { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BatchScan
, perhaps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a strong preference. I feel it's a little more clear to distinguish between scan and batch
207b0b9
to
83818fa
Compare
Test build #99038 has finished for PR 23086 at commit
|
83818fa
to
4407d51
Compare
Test build #99042 has finished for PR 23086 at commit
|
4407d51
to
188be4f
Compare
Test build #99088 has finished for PR 23086 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No serious problems I see, but I've mostly looked to ensure the streaming components will still work.
project/MimaExcludes.scala
Outdated
@@ -149,7 +149,8 @@ object MimaExcludes { | |||
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader"), | |||
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.writer.DataSourceWriter"), | |||
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.v2.writer.DataWriterFactory.createWriter"), | |||
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter") | |||
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This list of exclusions is getting kinda silly. Is there some way to just completely exclude this package from compatibility checks until we've stabilized it?
Test build #99135 has finished for PR 23086 at commit
|
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.sources.v2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#21306 (TableCatalog support) adds this class as org.apache.spark.sql.catalog.v2.Table
in the spark-catalyst
module. I think it needs to be in the catalyst module and should probably be in the o.a.s.sql.catalog.v2
package as well.
The important one is moving this to the catalyst module. The analyzer is in catalyst and all of the v2 logical plans and analysis rules will be in catalyst as well, because we are standardizing behavior. The standard validation rules should be in catalyst, not in a source-specific or hive-specific package in the sql-core or hive modules.
Because the logical plans and validation rules are in the catalyst package, the TableCatalog
API needs to be there as well. For example, when a catalog table identifier is resolved for a read query, one of the results is a TableCatalog
instance for the catalog portion of the identifier. That catalog is used to load the v2 table, which is then wrapped in a v2 relation for further analysis. Similarly, the write path should also validate that the catalog exists during analysis by loading it, and would then pass the catalog in a v2 logical plan for CreateTable
or CreateTableAsSelect
.
I also think that it makes sense to use the org.apache.spark.sql.catalog.v2
package for Table
because Table
is more closely tied to the TableCatalog
API than to the data source API. The link to DSv2 is that Table
carries newScanBuilder
, but the rest of the methods exposed by Table
are for catalog functions, like inspecting a table's partitioning or table properties.
Moving this class would make adding TableCatalog
less intrusive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving this to the Catalyst package would set a precedent for user-overridable behavior to live in the catalyst project. I'm not aware of anything in the Catalyst package being considered as public API right now. Are we allowed to start such a convention at this juncture?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Everything in catalyst is considered private (although public visibility for debugging) and it's best to stay that way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does this Table
API need to be in catalyst? It's not even a plan. We can define a table LogicalPlan interface in catalyst, and implement it in the SQL module with this Table
API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can understand wanting to keep everything in Catalyst private. That's fine with me, but I think that Catalyst does need to be able to interact with tables and catalogs that are supplied by users.
For example: Our tables support schema evolution. Specifically, reading files that were written before a column was added. When we add a column, Spark shouldn't start failing in analysis for an AppendData operation in a scheduled job (as it would today). We need to be able to signal to the validation rule that the table supports reading files that are missing columns, so that Spark can do the right validation and allow writes that used to work to continue.
How would that information -- support for reading missing columns -- be communicated to the analyzer?
Also, what about my example above: how will the analyzer load tables using a user-supplied catalog if catalyst can't use any user-supplied implementations?
We could move all of the v2 analysis rules, like ResolveRelations, into the core module, but it seems to me that this requirement is no longer providing value if we have to do that. I think that catalyst is the right place for common plans and analysis rules to live because it is the library of common SQL components.
Wherever the rules and plans end up, they will need to access to the TableCatalog
API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's unclear to me what would be the best choice:
- move data source API to catalyst module
- move data source related rules to SQL core module
- define private catalog related APIs in catalyst module and implement them in SQL core
Can we delay the discussion when we have a PR to add catalog support after the refactor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we delay the discussion when we have a PR to add catalog support after the refactor?
Yes, that works.
But, can we move Table
to the org.apache.spark.sql.catalog.v2
package where TableCatalog
is defined in the other PR? I think Table
should be defined with the catalog API and moving that later would require import changes to any file that references Table
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for other reviewers: in the ds v2 community sync, we decided to move data source v2 into a new module sql-api
, and make catalyst depends on it. This will be done in a followup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just went to make this change, but it requires moving any SQL class from catalyst referenced by the API into the API module as well... Let's discuss the options more on the dev list thread.
* The builder can take some query specific information to do operators pushdown, and keep these | ||
* information in the created {@link Scan}. | ||
*/ | ||
ScanBuilder newScanBuilder(DataSourceOptions options); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DataSourceOptions
isn't simply a map for two main reasons that I can tell: first, it forces options to be case insensitive, and second, it exposes helper methods to identify tables, like tableName
, databaseName
, and paths
. In the new abstraction, the second use of DataSourceOptions
is no longer needed. The table is already instantiated by the time that this is called.
We should to reconsider DataSourceOptions
. The tableName
methods aren't needed and we also no longer need to forward properties from the session config because the way tables are configured has changed (catalogs handle that). I think we should remove this class and instead use the more direct implementation, CaseInsensitiveStringMap
from #21306. The behavior of that class is obvious from its name and it would be shared between the v2 APIs, both catalog and data source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to me - DataSourceOptions
was carrying along identifiers that really belong to a table identifier and that should be interpreted at the catalog level, not the data read level. In other words the implementation of this Table
should already know what locations to look up (e.g. "files comprising dataset D"), now it's a matter of how (e.g. pushdown, filter predicates).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with it. Since CaseInsensitiveStringMap
is not in the code base yet, shall we do it in the followup?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either in a follow-up or you can add the class in this PR. Either way works for me.
/** | ||
* Return a {@link Table} instance to do read/write with user-specified schema and options. | ||
* | ||
* By default this method throws {@link UnsupportedOperationException}, implementations should |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Javadoc would normally also add @throws
with this information. I agree it should be here as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what I learned is that, we should only declare checked exceptions. See http://www.javapractices.com/topic/TopicAction.do?Id=171
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strange, that page links to one with the opposite advice: http://www.javapractices.com/topic/TopicAction.do?Id=44
I think that @throws
is a good idea whenever you want to document an exception type as part of the method contract. Since it is expected that this method isn't always implemented and may throw this exception, I think you were right to document it. And documenting exceptions is best done with @throws
to highlight them in Javadoc.
The page you linked to makes the argument that unchecked exceptions aren't part of the method contract and cannot be relied on. But documenting this shows that it is part of the contract or expected behavior, so I think docs are appropriate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added the throw clause.
|
||
/** | ||
* Return a {@link Table} instance to do read/write with user-specified schema and options. | ||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: Javadoc doesn't automatically parse empty lines as new paragraphs. If you want to have one in documentation, then use <p>
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the hint about new paragraph!
* | ||
* Note that, this may not be a full scan if the data source supports optimization like filter | ||
* push-down. Implementations should check the status of {@link Scan} that creates this batch, | ||
* and adjust the resulting {@link InputPartition input partitions}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a little unclear. Implementations do not necessarily check the scan. This Batch is likely configured with a filter and is responsible for creating splits for that filter.
* {@link Table} that creates this scan implements {@link SupportsBatchRead}. | ||
*/ | ||
default Batch toBatch() { | ||
throw new UnsupportedOperationException("Do not support batch scan."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: text should be "Batch scans are not supported". Starting with "Do not" makes the sentence a command.
@@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.datasources.jdbc._ | |||
import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource | |||
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation | |||
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils | |||
import org.apache.spark.sql.sources.v2.{BatchReadSupportProvider, DataSourceOptions, DataSourceV2} | |||
import org.apache.spark.sql.sources.v2._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: using wildcard imports makes it harder to review without an IDE because it is more difficult to find out where symbols come from.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do think this one is too nitpicking. If this gets long it should be wildcard. Use an IDE for large reviews like this if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the IDE that turns it into wildcard, because it gets too long.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am using an IDE for this review, but this makes future reviews harder. I realize it isn't a major issue, but I think it is a best practice to not use wildcard imports.
@@ -40,8 +40,8 @@ import org.apache.spark.sql.types.StructType | |||
* @param userSpecifiedSchema The user-specified schema for this scan. | |||
*/ | |||
case class DataSourceV2Relation( | |||
source: DataSourceV2, | |||
readSupport: BatchReadSupport, | |||
source: TableProvider, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May want to note that TableProvider will be removed when the write side is finished, since it is only used for createWriteSupport
, which will be exposed through Table
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
* </ul> | ||
*/ | ||
@Evolving | ||
public interface Table { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be helpful for a Table
to also expose a name or identifier of some kind. The TableIdentifier
passed into DataSourceV2Relation
is only used in name
to identify the relation's table. If the name (or location for path-based tables) were supplied by the table instead, it would remove the need to pass it in the relation.
provider, table, output, options, ident, userSpecifiedSchema) | ||
} | ||
|
||
def createRelationForWrite( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also note that this is temporary until the write side is finished?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
case other: DataSourceV2ScanExec => | ||
output == other.output && readSupport.getClass == other.readSupport.getClass && | ||
case other: DataSourceV2StreamingScanExec => | ||
output == other.output && source.getClass == other.source.getClass && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this implement identity instead of equality? When would two ScanExec nodes be equal instead of identical?
Also, I don't think that this equals implementation is correct. First, it should not check for the streaming class. Second, it should check whether the scan is equal, not whether the options and the source are the same (plus, source will be removed).
Unfortunately, implementing true equality (not just identity) must in some way rely on a user-supplied class. A scan is the same if it will produce the same set of rows and columns in those rows. That means equality depends on the filter, projection, and source data (i.e. table). We can use pushedFilters
and output
for the filter and projection. But checking that the source data is the same requires using either the scan's equals
method (which would also satisfy the filter and projection checks) or checking that the partitions are the same. Both Scan
and InputPartition
implementations are provided by sources, so their equals
methods may not be implemented.
Because this must depend on checking equality of user-supplied objects, I think it would be much easier to make this depend only on equality of the Scan
:
override def equals(other: Any): Boolean = other match {
case scanExec: DataSourceV2ScanExec => scanExec.scan == this.scan
case _ => false
}
That may fall back to identity if the user hasn't supplied an equals method, but I don't see a way to avoid it.
|
||
/** | ||
* Physical plan node for scanning data from a data source. | ||
* Physical plan node for scanning a batch of data from a data source. | ||
*/ | ||
case class DataSourceV2ScanExec( | ||
output: Seq[AttributeReference], | ||
@transient source: DataSourceV2, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can remove source by updating equals
and hashCode
to check just the Scan
.
|
||
/** | ||
* Physical plan node for scanning data from a data source. | ||
* Physical plan node for scanning a batch of data from a data source. | ||
*/ | ||
case class DataSourceV2ScanExec( | ||
output: Seq[AttributeReference], | ||
@transient source: DataSourceV2, | ||
@transient options: Map[String, String], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, options were used to create the Scan
so they don't need to be passed here if they are not used in equals
and hashCode
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
source
and options
are also used to define the string format of this plan, as it extends DataSourceV2StringFormat
.
Maybe we don't need a pretty string format for physical scan node?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With a catalog, there is no expectation that a source
will be passed. This could be a string that identifies either the source or the catalog, for a good string representation of the physical plan. This is another area where I think Table.name
would be helpful because the table's identifying information is really what should be shown instead of its source or catalog.
For options, these are part of the scan and aren't used to affect the behavior of this physical node. I think that means that they shouldn't be part of the node's arguments.
I think a good way to solve this problem is to change the pretty string format to use Scan
instead. That has the information that defines what this node is doing, like the filters, projection, and options. And being able to convert a logical scan to text would be useful across all 3 execution modes.
@@ -54,27 +53,17 @@ case class DataSourceV2ScanExec( | |||
Seq(output, source, options).hashCode() | |||
} | |||
|
|||
override def outputPartitioning: physical.Partitioning = readSupport match { | |||
override def outputPartitioning: physical.Partitioning = scan match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should SupportsReportPartitioning
extend Batch
instead of Scan
? Then this physical node could just be passed the Batch
and not the Scan
, PartitionReaderFactory
, and partitions.
In fact, I think that this node only requires output: Seq[AttributeReference], batch: Batch
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
filter pushdown happens at the planning phase, so the physical plan is the only place users can know which filters are pushed. Shall we keep pushedFilters
in the scan node?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you take my suggesting above to inspect the Scan
to build the string representation of this node, then I think the arguments should be scan
and output
. Then the batch can be fetched here.
For pushedFilters, I think that they should be fetched from the configured scan to build the string representation.
c24aeab
to
38fdac6
Compare
@transient scanConfig: ScanConfig) | ||
extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan { | ||
scanDesc: String, | ||
@transient batch: Batch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue I want to reuse this plan for batch and microbatch. Here this plan doesn't take Scan
but just Batch
, so that the caller side is flexible to decide how to produce batch(es) from a scan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me.
Test build #99428 has finished for PR 23086 at commit
|
Test build #99459 has finished for PR 23086 at commit
|
Test build #99461 has finished for PR 23086 at commit
|
* meaningful description. | ||
* </p> | ||
*/ | ||
default String description() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would have expected the default implementation to show both pushed filters and the read schema, along with the implementation class name. Read schema can be accessed by readSchema
. Should there also be a way to access the pushed filters? pushedFilters
seems like a good idea to me. (This can be added later)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is an interface, and filter pushdown is optional, I'm not sure how to report pushedFilters
here.
The read schema is always reported, see DataSourceV2ScanExec.simpleString
. Maybe we should still keep pushedFilters
in DataSourceV2ScanExec
, and display it in the plan string format. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about adding pushedFilters
that defaults to new Filter[0]
? Then users should override that to add filters to the description, if they are pushed. I think a Scan should be able to report its options, especially those that distinguish it from other scans, like pushed filters.
I guess we could have some wrapper around the user-provided Scan that holds the Scan options. I would want to standardize that instead of doing it in every scan exec node.
+1 There are only minor suggestions left from me. I'd like to see the default implementation of |
Test build #99493 has finished for PR 23086 at commit
|
I still do not think we should mix the catalog support with the data source APIs. Catalog is a well-defined concept in database systems, as what Spark SQL follows. The so-called "table catalog" is not a catalog to me. The data source APIs in this PR looks good to me. Merged to master. |
We are trying to keep these separate.
I'm glad that you're interested in joining the discussion on multi-catalog support. Let's have that discussion on the catalog issues or discussion threads on the dev list, not here on an update to the read API. |
@cloud-fan, thanks for getting this done! I'll wait for the equivalent write-side PR. |
## What changes were proposed in this pull request? Adjust the batch write API to match the read API refactor after #23086 The doc with high-level ideas: https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?usp=sharing Basically it renames `BatchWriteSupportProvider` to `SupportsBatchWrite`, and make it extend `Table`. Renames `WriteSupport` to `Write`. It also cleans up some code as batch API is completed. This PR also removes the test from #22688 . Now data source must return a table for read/write. A few notes about future changes: 1. We will create `SupportsStreamingWrite` later for streaming APIs 2. We will create `SupportsBatchReplaceWhere`, `SupportsBatchAppend`, etc. for the new end-user write APIs. I think streaming APIs would remain to use `OutputMode`, and new end-user write APIs will apply to batch only, at least in the near future. 3. We will remove `SaveMode` from data source API: https://issues.apache.org/jira/browse/SPARK-26356 ## How was this patch tested? existing tests Closes #23208 from cloud-fan/refactor-batch. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request? This is the first step of the data source v2 API refactor [proposal](https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?usp=sharing) It adds the new API for batch read, without removing the old APIs, as they are still needed for streaming sources. More concretely, it adds 1. `TableProvider`, works like an anonymous catalog 2. `Table`, represents a structured data set. 3. `ScanBuilder` and `Scan`, a logical represents of data source scan 4. `Batch`, a physical representation of data source batch scan. ## How was this patch tested? existing tests Closes apache#23086 from cloud-fan/refactor-batch. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request? Adjust the batch write API to match the read API refactor after apache#23086 The doc with high-level ideas: https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?usp=sharing Basically it renames `BatchWriteSupportProvider` to `SupportsBatchWrite`, and make it extend `Table`. Renames `WriteSupport` to `Write`. It also cleans up some code as batch API is completed. This PR also removes the test from apache#22688 . Now data source must return a table for read/write. A few notes about future changes: 1. We will create `SupportsStreamingWrite` later for streaming APIs 2. We will create `SupportsBatchReplaceWhere`, `SupportsBatchAppend`, etc. for the new end-user write APIs. I think streaming APIs would remain to use `OutputMode`, and new end-user write APIs will apply to batch only, at least in the near future. 3. We will remove `SaveMode` from data source API: https://issues.apache.org/jira/browse/SPARK-26356 ## How was this patch tested? existing tests Closes apache#23208 from cloud-fan/refactor-batch. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request? Following apache#23086, this PR does the API refactor for micro-batch read, w.r.t. the [doc](https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?usp=sharing) The major changes: 1. rename `XXXMicroBatchReadSupport` to `XXXMicroBatchReadStream` 2. implement `TableProvider`, `Table`, `ScanBuilder` and `Scan` for streaming sources 3. at the beginning of micro-batch streaming execution, convert `StreamingRelationV2` to `StreamingDataSourceV2Relation` directly, instead of `StreamingExecutionRelation`. followup: support operator pushdown for stream sources ## How was this patch tested? existing tests Closes apache#23430 from cloud-fan/micro-batch. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: gatorsmile <gatorsmile@gmail.com>
Dataset.ofRows(sparkSession, DataSourceV2Relation.create( | ||
provider, s, finalOptions, userSpecifiedSchema = userSpecifiedSchema)) | ||
|
||
case _ => loadV1Source(paths: _*) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @cloud-fan. I have a minor question here how we load the data that just extend the SupportsRead
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
things have changed now with the table capability API. Please check the new code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, thanks a lot.
What changes were proposed in this pull request?
This is the first step of the data source v2 API refactor proposal
It adds the new API for batch read, without removing the old APIs, as they are still needed for streaming sources.
More concretely, it adds
TableProvider
, works like an anonymous catalogTable
, represents a structured data set.ScanBuilder
andScan
, a logical represents of data source scanBatch
, a physical representation of data source batch scan.How was this patch tested?
existing tests