-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-27190][SQL] add table capability for streaming #24129
Changes from 3 commits
b13725d
04c2f4c
c86fb8a
e022da0
953b77d
30ca36c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
This file was deleted.
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,8 +21,6 @@ | |
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousStream; | ||
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchStream; | ||
import org.apache.spark.sql.types.StructType; | ||
import org.apache.spark.sql.sources.v2.SupportsContinuousRead; | ||
import org.apache.spark.sql.sources.v2.SupportsMicroBatchRead; | ||
import org.apache.spark.sql.sources.v2.Table; | ||
import org.apache.spark.sql.sources.v2.TableCapability; | ||
|
||
|
@@ -74,8 +72,8 @@ default Batch toBatch() { | |
/** | ||
* Returns the physical representation of this scan for streaming query with micro-batch mode. By | ||
* default this method throws exception, data sources must overwrite this method to provide an | ||
* implementation, if the {@link Table} that creates this scan implements | ||
* {@link SupportsMicroBatchRead}. | ||
* implementation, if the {@link Table} that creates this scan returns | ||
* {@link TableCapability#MICRO_BATCH_READ} support in its {@link Table#capabilities()}. | ||
* | ||
* @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure | ||
* recovery. Data streams for the same logical source in the same query | ||
|
@@ -90,8 +88,8 @@ default MicroBatchStream toMicroBatchStream(String checkpointLocation) { | |
/** | ||
* Returns the physical representation of this scan for streaming query with continuous mode. By | ||
* default this method throws exception, data sources must overwrite this method to provide an | ||
* implementation, if the {@link Table} that creates this scan implements | ||
* {@link SupportsContinuousRead}. | ||
* implementation, if the {@link Table} that creates this scan returns | ||
* {@link TableCapability#CONTINUOUS_READ} support in its {@link Table#capabilities()}. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These are non-functional changes and should not be in this commit. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How can we move this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right, sorry I missed the links that are removed. This change is fine. |
||
* | ||
* @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure | ||
* recovery. Data streams for the same logical source in the same query | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,7 +17,6 @@ | |
|
||
package org.apache.spark.sql.execution.streaming | ||
|
||
import scala.collection.JavaConverters._ | ||
import scala.collection.mutable.{Map => MutableMap} | ||
|
||
import org.apache.spark.sql.{Dataset, SparkSession} | ||
|
@@ -78,6 +77,7 @@ class MicroBatchExecution( | |
val disabledSources = | ||
sparkSession.sqlContext.conf.disabledV2StreamingMicroBatchReaders.split(",") | ||
|
||
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ | ||
val _logicalPlan = analyzedPlan.transform { | ||
case streamingRelation@StreamingRelation(dataSourceV1, sourceName, output) => | ||
toExecutionRelationMap.getOrElseUpdate(streamingRelation, { | ||
|
@@ -88,31 +88,33 @@ class MicroBatchExecution( | |
logInfo(s"Using Source [$source] from DataSourceV1 named '$sourceName' [$dataSourceV1]") | ||
StreamingExecutionRelation(source, output)(sparkSession) | ||
}) | ||
case s @ StreamingRelationV2(ds, dsName, table: SupportsMicroBatchRead, options, output, _) | ||
if !disabledSources.contains(ds.getClass.getCanonicalName) => | ||
v2ToRelationMap.getOrElseUpdate(s, { | ||
// Materialize source to avoid creating it in every batch | ||
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" | ||
nextSourceId += 1 | ||
logInfo(s"Reading table [$table] from DataSourceV2 named '$dsName' [$ds]") | ||
// TODO: operator pushdown. | ||
val scan = table.newScanBuilder(options).build() | ||
val stream = scan.toMicroBatchStream(metadataPath) | ||
StreamingDataSourceV2Relation(output, scan, stream) | ||
}) | ||
case s @ StreamingRelationV2(ds, dsName, _, _, output, v1Relation) => | ||
v2ToExecutionRelationMap.getOrElseUpdate(s, { | ||
// Materialize source to avoid creating it in every batch | ||
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" | ||
if (v1Relation.isEmpty) { | ||
throw new UnsupportedOperationException( | ||
s"Data source $dsName does not support microbatch processing.") | ||
} | ||
val source = v1Relation.get.dataSource.createSource(metadataPath) | ||
nextSourceId += 1 | ||
logInfo(s"Using Source [$source] from DataSourceV2 named '$dsName' [$ds]") | ||
StreamingExecutionRelation(source, output)(sparkSession) | ||
}) | ||
|
||
case s @ StreamingRelationV2(src, srcName, table: SupportsRead, options, output, v1) => | ||
val v2Disabled = disabledSources.contains(src.getClass.getCanonicalName) | ||
if (!v2Disabled && table.supports(TableCapability.MICRO_BATCH_READ)) { | ||
v2ToRelationMap.getOrElseUpdate(s, { | ||
// Materialize source to avoid creating it in every batch | ||
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" | ||
nextSourceId += 1 | ||
logInfo(s"Reading table [$table] from DataSourceV2 named '$srcName' [$src]") | ||
// TODO: operator pushdown. | ||
val scan = table.newScanBuilder(options).build() | ||
val stream = scan.toMicroBatchStream(metadataPath) | ||
StreamingDataSourceV2Relation(output, scan, stream) | ||
}) | ||
} else if (v1.isEmpty) { | ||
throw new UnsupportedOperationException( | ||
s"Data source $srcName does not support microbatch processing.") | ||
} else { | ||
v2ToExecutionRelationMap.getOrElseUpdate(s, { | ||
// Materialize source to avoid creating it in every batch | ||
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" | ||
val source = v1.get.dataSource.createSource(metadataPath) | ||
nextSourceId += 1 | ||
logInfo(s"Using Source [$source] from DataSourceV2 named '$srcName' [$src]") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know this wasn't introduced by the current PR, but this shouldn't say DataSourceV2. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shall we mention something about v1 and v2? Some sources have both v1 and v2 implementations and it might be helpful to have a log saying which implementation is actually used. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would be valuable to include whether v1 or v2 is used when the source supports both. |
||
StreamingExecutionRelation(source, output)(sparkSession) | ||
}) | ||
} | ||
} | ||
sources = _logicalPlan.collect { | ||
// v1 source | ||
|
@@ -123,7 +125,7 @@ class MicroBatchExecution( | |
uniqueSources = sources.distinct | ||
|
||
sink match { | ||
case s: SupportsStreamingWrite => | ||
case s: SupportsWrite => | ||
val streamingWrite = createStreamingWrite(s, extraOptions, _logicalPlan) | ||
WriteToMicroBatchDataSource(streamingWrite, _logicalPlan) | ||
|
||
|
@@ -519,7 +521,7 @@ class MicroBatchExecution( | |
|
||
val triggerLogicalPlan = sink match { | ||
case _: Sink => newAttributePlan | ||
case _: SupportsStreamingWrite => | ||
case _: SupportsWrite => | ||
newAttributePlan.asInstanceOf[WriteToMicroBatchDataSource].createPlan(currentBatchId) | ||
case _ => throw new IllegalArgumentException(s"unknown sink type for $sink") | ||
} | ||
|
@@ -550,7 +552,7 @@ class MicroBatchExecution( | |
SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution) { | ||
sink match { | ||
case s: Sink => s.addBatch(currentBatchId, nextBatch) | ||
case _: SupportsStreamingWrite => | ||
case _: SupportsWrite => | ||
// This doesn't accumulate any data - it just forces execution of the microbatch writer. | ||
nextBatch.collect() | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: this commit includes non-functional changes. These docs changes are good to have, but they are not related to streaming and there is no need for this file to change in this commit. I would prefer to revert this and include it in a docs-only PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same with the changes in
WriteBuilder
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's related to streaming: https://github.com/apache/spark/pull/24129/files#diff-d111d7e2179b55465840c9a81ea004f2R76
The only unnecessary change is the doc for batch. I replace
BATCH_READ
with a java doc link, to be consistent with streaming.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Javadoc changes are non-functional changes. If you want to correct Javadoc, limit updates to files that are already changing in the PR.
As it is, this is a non-functional change in an unrelated file. The commit message is going to indicate this commit is related to streaming, so this could easily cause conflicts. Please remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't agree with this. I add doc for streaming and change the doc for batch a little bit to match the streaming doc style. It's very annoying to separate minor change like this. I do consider this change as self-contained.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are too many documentation-only changes for this PR to be committed. Please separate them into a different PR. A docs-only commit will be easy to review and easy to cherry-pick based on the description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did I miss something here? The docs changes are totally related to the current PR (the doc would refer to deleted interface).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docs changes that I was referring to were moved already, this was just a poorly placed comment.