-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25528][SQL] data source V2 read side API refactoring #22547
Conversation
}.toArray | ||
} | ||
override def createContinuousScan(start: Offset): ContinuousScan = { | ||
val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(start) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
|
||
override def planInputPartitions(): Array[InputPartition] = { | ||
val startPartitionOffsets = start.partitionToOffsets |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkpointLocation: String, | ||
config: ScanConfig, | ||
options: DataSourceOptions): MicroBatchInputStream = { | ||
val parameters = options.asMap().asScala.toMap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkpointLocation: String, | ||
config: ScanConfig, | ||
options: DataSourceOptions): ContinuousInputStream = { | ||
val parameters = options.asMap().asScala.toMap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] => | ||
scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig] | ||
}.exists { config => | ||
query.lastExecution.logical.collectFirst { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now the known partitions is tracked by the KafkaContinuousInputStream
in logical plan: https://github.com/apache/spark/pull/22547/files#diff-5fa6c9fc023183f4a855f778944d23ebR62
This comment has been minimized.
This comment has been minimized.
f08e02a
to
a349686
Compare
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
Test build #96570 has finished for PR 22547 at commit
|
Test build #97206 has finished for PR 22547 at commit
|
@cloud-fan, sorry to look at this so late, I was out on vacation for a little while. Is this about ready for review? |
Hi @rdblue welcome back! I just rebased it so it's ready for review :) |
This comment has been minimized.
This comment has been minimized.
* | ||
* @param config a {@link ScanConfig} which may contains operator pushdown information. | ||
* @param options the user-specified options, which is same as the one used to create the | ||
* {@link ScanConfigBuilder} that built the given {@link ScanConfig}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another choice is to let ScanConfig
carry the options. But ScanConfig
is an interface and doing this will put more work at user side, so I decided to pass the options again here. Feedbacks are welcome!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that options should be passed twice.
@@ -90,6 +140,8 @@ class ContinuousExecution( | |||
do { | |||
runContinuous(sparkSessionForStream) | |||
} while (state.updateAndGet(stateUpdate) == ACTIVE) | |||
|
|||
stopSources() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with the new abstraction, we should only stop sources when the stream query ends, instead of each reconfiguration.
@@ -60,6 +59,14 @@ class RateStreamMicroBatchReadSupport(options: DataSourceOptions, checkpointLoca | |||
s" is $maxSeconds, but 'rampUpTimeSeconds' is $rampUpTimeSeconds.") | |||
} | |||
|
|||
private val numPartitions = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -319,29 +307,18 @@ class RateSourceSuite extends StreamTest { | |||
"rate source does not support user-specified schema")) | |||
} | |||
|
|||
test("continuous in registry") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need this test now. With the new Format
abstraction, the lookup logic is unified between microbatch and continuous
val v1Source = spark.readStream.format(classOf[FakeDefaultSource].getName).load().select("a") | ||
val v2Source = spark.readStream.format(classOf[FakeFormat].getName).load().select("a") | ||
|
||
Seq(v1Source, v2Source).foreach { df => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
improve this test to make sure v2 also works.
@@ -381,7 +390,7 @@ class StreamSuite extends StreamTest { | |||
|
|||
test("insert an extraStrategy") { | |||
try { | |||
spark.experimental.extraStrategies = TestStrategy :: Nil | |||
spark.experimental.extraStrategies = CustomStrategy :: Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we need to do a temporary planning for streaming queries, we can't allow custom strategy to remove streaming leaf nodes.
A major part of this PR is to update existing streaming sources, which is just moving code around. There are 3 things we need to pay attention to during review:
|
Test build #97538 has finished for PR 22547 at commit
|
Test build #97540 has finished for PR 22547 at commit
|
@@ -46,17 +45,22 @@ import org.apache.spark.sql.types.StructType | |||
* scenarios, where some offsets after the specified initial ones can't be | |||
* properly read. | |||
*/ | |||
class KafkaContinuousReadSupport( | |||
class KafkaContinuousInputStream( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to break this change into multiple PRs for batch, microbatch, and continuous? It's really large and it would be nice if we could get the changes in incrementally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for this. A lot of the changes right now are for moving around the streaming code especially, which makes it harder to isolate just the proposed API for review.
An alternative is to make this PR separate commits that, while the commits themselves may not compile because of mismatching signatures - but all the commits taken together would compile, and each commit can be reviewed individually for assessing the API and then the implementation.
For example I'd propose 3 PRs:
- Batch reading, with a commit for the interface changes and a separate commit for the implementation changes
- Micro Batch Streaming read, with a commit for the interface changes and a separate commit for the implementation changes
- Continuous streaming read, similar to above
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer that the commits themselves compile, but since this is separating the modes I think it could be done incrementally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. I really consider this to be a blocker on getting this merged and approved. It's difficult to have confidence in a review over such a large change. Thoughts @cloud-fan @rdblue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I'll separate this PR into 3 smaller ones, after we have agreed on the high-level design at https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?usp=sharing
@cloud-fan, is there a design doc that outlines these changes and the new API structure? Also, I don't agree that we should investigate logical/physical scan later. If this goes in, we've effectively decided not to have a unified abstraction between batch and streaming. |
.filter(_.toLowerCase(Locale.ROOT).startsWith("kafka.")) | ||
.map { k => k.drop(6).toString -> parameters(k) } | ||
.toMap | ||
object KafkaTable extends Table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is KafkaTable
an object, not a class? This doesn't seem to fit an abstraction.
/** | ||
* Returns a factory, which produces one {@link PartitionReader} for one {@link InputPartition}. | ||
*/ | ||
PartitionReaderFactory createReaderFactory(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are BatchScan
and PartitionReaderFactory
different interfaces?
* @param options the user-specified options, which is same as the one used to create the | ||
* {@link ScanConfigBuilder} that built the given {@link ScanConfig}. | ||
*/ | ||
BatchScan createBatchScan(ScanConfig config, DataSourceOptions options); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a benefit to having both ScanConfig
and BatchScan
objects? Why not have ScanConfigBuilder
return a BatchScan
directly by calling buildBatch
?
*/ | ||
PartitionReaderFactory createReaderFactory(ScanConfig config); | ||
default ScanConfigBuilder newScanConfigBuilder(DataSourceOptions options) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be clear that these are scan-specific options. Maybe add some documentation with an example of something that would be passed to configure a scan, like a target split size for combining.
*/ | ||
interface StreamingReadSupport extends ReadSupport { | ||
@InterfaceStability.Evolving | ||
public interface InputStream extends BaseStreamingSource { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InputStream
conflicts with a well-known JVM class, java.io.InputStream
. I think this should be renamed to be more specific to a streaming table scan.
userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = { | ||
val readSupport = source.createReadSupport(options, userSpecifiedSchema) | ||
val output = readSupport.fullSchema().toAttributes | ||
userSpecifiedSchema: Option[StructType] = None): Option[DataSourceV2Relation] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't return an option. A relation is not a read-side structure, it is also used in write-side logical plans as the target of a write. Validation rules like PreprocessTableInsertion validate the write dataframe against the relation's schema. That's why the relation has a newWriteSupport method.
Creating a relation from a Table should always work, even if the table isn't readable or isn't writable. Analysis can be done later to validate whether the plan that contains a relation can actually use the table.
After looking at the changes, I want to reiterate that request for a design doc. I think that code is a great way to prototype a design, but that we need to step back and make sure that the design makes sense when you view it from a high level. I have two main motivations for that point. First, there are some classes that I don't see a justification for, like having a separate ScanConfig, BatchScan, and PartitionReaderFactory. Are all of those separate classes necessary? Can a ScanConfigBuilder return a BatchScan? Can BatchScan expose a createBatchReader(InputPartition) method? My second motivation for saying we need a clear design doc is that I think that the current way to interact with v2 doesn't fit well with catalogs. This is based around Format, which is based on the v1 method of loading read and write implementations. But that isn't the primary way that v2 will be used be used. It happens to be the only way to call into the v2 API from Spark today, but the primary use of v2 is to integrate sources that are actually modeled as tables in some catalog. For example, Format exposes getTable that returns a Table implementation from DataSourceOptions. Those options have tableName and databaseName methods. But tables that are identified by name shouldn't be loaded by a Format, they should be loaded by a catalog. It also uses the options for both table options and read options because there isn't a way to pass both. But most tables will be created with table options by a catalog and will accept read-specific options passed to the DataFrameReader. I think we would approach a usable API much sooner if this work was planned based on a shared understanding of how catalogs and tables will interact in the future. Not having a catalog API right now is affecting the way tables work in this PR, and that's a concern for me. |
* The major responsibility of this interface is to return a {@link Table} for read/write. | ||
*/ | ||
@InterfaceStability.Evolving | ||
public interface Format extends DataSourceV2 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is there both Format and DataSourceV2? What does DataSourceV2 do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the write API has not been migrated and still need DataSourceV2
/** | ||
* Return a {@link Table} instance to do read/write with user-specified options. | ||
* | ||
* @param options the user-specified options that can identify a table, e.g. path, table name, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it necessary to pass table name and database to Format? Format should only be used in 2 places to create tables. First, in the DataFrameReader (or writer) API when a format is specified directly instead of a catalog/database/table or catalog/path. Second, it would be used in catalogs that support pluggable implementations, like the current session catalog, which needs to dynamically instantiate implementations based on the table's provider.
I agree that we need a shared understanding of the relationship between this work and the new catalog API. I was not under the impression that the primary purpose of v2 is to integrate catalog tables. |
@jose-torres, I don't mean that the primary purpose of the v2 API is for catalog integration, I mean that the primary use of v2 is with tables that are stored in some catalog. So we should make sure that the plan and design work well with catalog tables. Another reason that catalog tables are important is that the v2 plans require a catalog for consistent behavior. So catalogs are important and I think will affect the implementation details. |
Let's move the high-level discussion to https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?usp=sharing |
@cloud-fan @rdblue I believe we've converged on an appropriate API as per our last sync. Do we have a plan to move this forward, with the separated smaller patches? |
I agree that there is consensus for the proposal in the design doc and I don't think there are any blockers. If there's something I can do to help, please let me know. Otherwise ping me to review! |
I was stuck with some personal business recently, I'll send a PR for batch source after the weekend. |
What changes were proposed in this pull request?
Refactor the read side API according to the abstraction proposed in the dev list
More concretely, this PR
Format
that can returnTable
ReadSupportProvider
toTable
, represents a logical data set, with a schema.InputStream
to represent a streaming source in a streaming query. It can createScan
s.ReadSupport
toScan
. EachScan
triggers one Spark job. (like an RDD)A major change of this PR is about how we execute streaming queries.
Previously:
StreamingRelationV2
StreamingRelationV2
toStreamingExecutionRelation
.StreamingExecutionRelation
toStreamingDataSourceV2Relation
. Then apply query optimization and planning, and run.Currently:
StreamingRelationV2
StreamingRelationV2
toMicroBatch/ContinuousExecutionRelation
.MicroBatch/ContinuousExecutionRelation
toStreamingDataSourceV2Relation
.StreamingDataSourceV2Relation
, do query optimization and planning, and run. Note that, the operator pushdown in the query optimization of this step will be exactly the same as the step 3.followup:
How was this patch tested?
existing tests.