-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-27190][SQL] add table capability for streaming #24129
Changes from all commits
b13725d
04c2f4c
c86fb8a
e022da0
953b77d
30ca36c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
This file was deleted.
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,8 +21,6 @@ | |
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousStream; | ||
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchStream; | ||
import org.apache.spark.sql.types.StructType; | ||
import org.apache.spark.sql.sources.v2.SupportsContinuousRead; | ||
import org.apache.spark.sql.sources.v2.SupportsMicroBatchRead; | ||
import org.apache.spark.sql.sources.v2.Table; | ||
import org.apache.spark.sql.sources.v2.TableCapability; | ||
|
||
|
@@ -74,8 +72,8 @@ default Batch toBatch() { | |
/** | ||
* Returns the physical representation of this scan for streaming query with micro-batch mode. By | ||
* default this method throws exception, data sources must overwrite this method to provide an | ||
* implementation, if the {@link Table} that creates this scan implements | ||
* {@link SupportsMicroBatchRead}. | ||
* implementation, if the {@link Table} that creates this scan returns | ||
* {@link TableCapability#MICRO_BATCH_READ} support in its {@link Table#capabilities()}. | ||
* | ||
* @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure | ||
* recovery. Data streams for the same logical source in the same query | ||
|
@@ -90,8 +88,8 @@ default MicroBatchStream toMicroBatchStream(String checkpointLocation) { | |
/** | ||
* Returns the physical representation of this scan for streaming query with continuous mode. By | ||
* default this method throws exception, data sources must overwrite this method to provide an | ||
* implementation, if the {@link Table} that creates this scan implements | ||
* {@link SupportsContinuousRead}. | ||
* implementation, if the {@link Table} that creates this scan returns | ||
* {@link TableCapability#CONTINUOUS_READ} support in its {@link Table#capabilities()}. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These are non-functional changes and should not be in this commit. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How can we move this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right, sorry I missed the links that are removed. This change is fine. |
||
* | ||
* @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure | ||
* recovery. Data streams for the same logical source in the same query | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.execution.datasources.v2 | ||
|
||
import org.apache.spark.sql.AnalysisException | ||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRelationV2} | ||
import org.apache.spark.sql.sources.v2.TableCapability.{CONTINUOUS_READ, MICRO_BATCH_READ} | ||
|
||
/** | ||
* This rules adds some basic table capability check for streaming scan, without knowing the actual | ||
* streaming execution mode. | ||
*/ | ||
object V2StreamingScanSupportCheck extends (LogicalPlan => Unit) { | ||
import DataSourceV2Implicits._ | ||
|
||
override def apply(plan: LogicalPlan): Unit = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think these checks should also include a validation for individual tables. If a table doesn't support streaming at all, a more helpful error message is that a specific table doesn't support streaming, not just that there isn't a streaming mode that can handle all of the sources. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we add this check here, it will never be hit because we already checked it earlier in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think eventually we will have an analyzer rule that checks streaming scan capability and fallback to v1 if necessary. This checker rule is not suitable because it just traverses the plan, not returning a new plan. So we can't implement the fallback logic in this rule. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even if it will never be hit, it should still be in the analyzer so that plans that are created through other paths in the future are still checked. The idea is to avoid relying on a particular way of creating a logical plan to validate logical plans. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that's why I opened https://issues.apache.org/jira/browse/SPARK-27483 plans that are created through other APIs(not There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why would this check depend on fallback logic? These checks run after resolution rules have reached a fixed point. If there is a streaming DSv2 relation in the plan, fallback should already be done. Fallback logic is separate and this check can be done here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It's the opposite. The fallback logic depends on the check. That said, the future analyzer rule would do 3 things:
You can see that, it's not a simple check anymore, which can not be done within this simpler checker rule ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cloud-fan, let's get the validation in now. I don't think that the fallback rule should be implemented as you describe. I think it should be done in 2 parts: the rule to fallback and update the plan, and this validation that all sources support streaming. Spark should not combine transform rules and validations. There are a couple of reasons for this principle:
So we will need a validation rule either way. When the fallback rule runs and can't fix the problem, this check should be what fails the plan. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added. |
||
plan.foreach { | ||
case r: StreamingRelationV2 if !r.table.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ) => | ||
throw new AnalysisException( | ||
s"Table ${r.table.name()} does not support either micro-batch or continuous scan.") | ||
case _ => | ||
} | ||
|
||
val streamingSources = plan.collect { | ||
case r: StreamingRelationV2 => r.table | ||
} | ||
val v1StreamingRelations = plan.collect { | ||
case r: StreamingRelation => r | ||
} | ||
|
||
if (streamingSources.length + v1StreamingRelations.length > 1) { | ||
val allSupportsMicroBatch = streamingSources.forall(_.supports(MICRO_BATCH_READ)) | ||
// v1 streaming data source only supports micro-batch. | ||
val allSupportsContinuous = streamingSources.forall(_.supports(CONTINUOUS_READ)) && | ||
v1StreamingRelations.isEmpty | ||
if (!allSupportsMicroBatch && !allSupportsContinuous) { | ||
val microBatchSources = | ||
streamingSources.filter(_.supports(MICRO_BATCH_READ)).map(_.name()) ++ | ||
v1StreamingRelations.map(_.sourceName) | ||
val continuousSources = streamingSources.filter(_.supports(CONTINUOUS_READ)).map(_.name()) | ||
throw new AnalysisException( | ||
"The streaming sources in a query do not have a common supported execution mode.\n" + | ||
"Sources support micro-batch: " + microBatchSources.mkString(", ") + "\n" + | ||
"Sources support continuous: " + continuousSources.mkString(", ")) | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: this commit includes non-functional changes. These docs changes are good to have, but they are not related to streaming and there is no need for this file to change in this commit. I would prefer to revert this and include it in a docs-only PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same with the changes in
WriteBuilder
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's related to streaming: https://github.com/apache/spark/pull/24129/files#diff-d111d7e2179b55465840c9a81ea004f2R76
The only unnecessary change is the doc for batch. I replace
BATCH_READ
with a java doc link, to be consistent with streaming.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Javadoc changes are non-functional changes. If you want to correct Javadoc, limit updates to files that are already changing in the PR.
As it is, this is a non-functional change in an unrelated file. The commit message is going to indicate this commit is related to streaming, so this could easily cause conflicts. Please remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't agree with this. I add doc for streaming and change the doc for batch a little bit to match the streaming doc style. It's very annoying to separate minor change like this. I do consider this change as self-contained.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are too many documentation-only changes for this PR to be committed. Please separate them into a different PR. A docs-only commit will be easy to review and easy to cherry-pick based on the description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did I miss something here? The docs changes are totally related to the current PR (the doc would refer to deleted interface).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docs changes that I was referring to were moved already, this was just a poorly placed comment.