Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27190][SQL] add table capability for streaming #24129

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.kafka010

import java.{util => ju}
import java.util.{Collections, Locale, UUID}
import java.util.{Locale, UUID}

import scala.collection.JavaConverters._

Expand All @@ -29,9 +29,10 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
import org.apache.spark.internal.Logging
import org.apache.spark.kafka010.KafkaConfigUpdater
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.execution.streaming.{BaseStreamingSink, Sink, Source}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.TableCapability._
import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.sources.v2.writer.WriteBuilder
Expand Down Expand Up @@ -353,13 +354,15 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}

class KafkaTable(strategy: => ConsumerStrategy) extends Table
with SupportsMicroBatchRead with SupportsContinuousRead with SupportsStreamingWrite {
with SupportsRead with SupportsWrite with BaseStreamingSink {

override def name(): String = s"Kafka $strategy"

override def schema(): StructType = KafkaOffsetReader.kafkaSchema

override def capabilities(): ju.Set[TableCapability] = Collections.emptySet()
override def capabilities(): ju.Set[TableCapability] = {
Set(MICRO_BATCH_READ, CONTINUOUS_READ, STREAMING_WRITE).asJava
}

override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder =
() => new KafkaScan(options)
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ public enum TableCapability {
*/
BATCH_READ,

/**
* Signals that the table supports reads in micro-batch streaming execution mode.
*/
MICRO_BATCH_READ,

/**
* Signals that the table supports reads in continuous streaming execution mode.
*/
CONTINUOUS_READ,

/**
* Signals that the table supports append writes in batch execution mode.
* <p>
Expand All @@ -42,6 +52,15 @@ public enum TableCapability {
*/
BATCH_WRITE,

/**
* Signals that the table supports append writes in streaming execution mode.
* <p>
* Tables that return this capability must support appending data and may also support additional
* write modes, like {@link #TRUNCATE}, {@link #OVERWRITE_BY_FILTER}, and
* {@link #OVERWRITE_DYNAMIC}.
*/
STREAMING_WRITE,

/**
* Signals that the table can be truncated in a write operation.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousStream;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchStream;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.sources.v2.SupportsContinuousRead;
import org.apache.spark.sql.sources.v2.SupportsMicroBatchRead;
import org.apache.spark.sql.sources.v2.Table;
import org.apache.spark.sql.sources.v2.TableCapability;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this commit includes non-functional changes. These docs changes are good to have, but they are not related to streaming and there is no need for this file to change in this commit. I would prefer to revert this and include it in a docs-only PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same with the changes in WriteBuilder.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's related to streaming: https://github.com/apache/spark/pull/24129/files#diff-d111d7e2179b55465840c9a81ea004f2R76

The only unnecessary change is the doc for batch. I replace BATCH_READ with a java doc link, to be consistent with streaming.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadoc changes are non-functional changes. If you want to correct Javadoc, limit updates to files that are already changing in the PR.

As it is, this is a non-functional change in an unrelated file. The commit message is going to indicate this commit is related to streaming, so this could easily cause conflicts. Please remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't agree with this. I add doc for streaming and change the doc for batch a little bit to match the streaming doc style. It's very annoying to separate minor change like this. I do consider this change as self-contained.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are too many documentation-only changes for this PR to be committed. Please separate them into a different PR. A docs-only commit will be easy to review and easy to cherry-pick based on the description.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did I miss something here? The docs changes are totally related to the current PR (the doc would refer to deleted interface).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs changes that I was referring to were moved already, this was just a poorly placed comment.


Expand Down Expand Up @@ -74,8 +72,8 @@ default Batch toBatch() {
/**
* Returns the physical representation of this scan for streaming query with micro-batch mode. By
* default this method throws exception, data sources must overwrite this method to provide an
* implementation, if the {@link Table} that creates this scan implements
* {@link SupportsMicroBatchRead}.
* implementation, if the {@link Table} that creates this scan returns
* {@link TableCapability#MICRO_BATCH_READ} support in its {@link Table#capabilities()}.
*
* @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
* recovery. Data streams for the same logical source in the same query
Expand All @@ -90,8 +88,8 @@ default MicroBatchStream toMicroBatchStream(String checkpointLocation) {
/**
* Returns the physical representation of this scan for streaming query with continuous mode. By
* default this method throws exception, data sources must overwrite this method to provide an
* implementation, if the {@link Table} that creates this scan implements
* {@link SupportsContinuousRead}.
* implementation, if the {@link Table} that creates this scan returns
* {@link TableCapability#CONTINUOUS_READ} support in its {@link Table#capabilities()}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are non-functional changes and should not be in this commit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we move this? SupportsContinuousRead is removed in this PR and TableCapability#CONTINUOUS_READ is added in this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, sorry I missed the links that are removed. This change is fine.

*
* @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
* recovery. Data streams for the same logical source in the same query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.streaming.BaseStreamingSink
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.writer._
Expand All @@ -39,11 +40,13 @@ class NoopDataSource extends TableProvider with DataSourceRegister {
override def getTable(options: CaseInsensitiveStringMap): Table = NoopTable
}

private[noop] object NoopTable extends Table with SupportsWrite with SupportsStreamingWrite {
private[noop] object NoopTable extends Table with SupportsWrite with BaseStreamingSink {
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = NoopWriteBuilder
override def name(): String = "noop-table"
override def schema(): StructType = new StructType()
override def capabilities(): util.Set[TableCapability] = Set(TableCapability.BATCH_WRITE).asJava
override def capabilities(): util.Set[TableCapability] = {
Set(TableCapability.BATCH_WRITE, TableCapability.STREAMING_WRITE).asJava
}
}

private[noop] object NoopWriteBuilder extends WriteBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.execution.streaming

import scala.collection.JavaConverters._
import scala.collection.mutable.{Map => MutableMap}

import org.apache.spark.sql.{Dataset, SparkSession}
Expand Down Expand Up @@ -78,6 +77,7 @@ class MicroBatchExecution(
val disabledSources =
sparkSession.sqlContext.conf.disabledV2StreamingMicroBatchReaders.split(",")

import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
val _logicalPlan = analyzedPlan.transform {
case streamingRelation@StreamingRelation(dataSourceV1, sourceName, output) =>
toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
Expand All @@ -88,31 +88,33 @@ class MicroBatchExecution(
logInfo(s"Using Source [$source] from DataSourceV1 named '$sourceName' [$dataSourceV1]")
StreamingExecutionRelation(source, output)(sparkSession)
})
case s @ StreamingRelationV2(ds, dsName, table: SupportsMicroBatchRead, options, output, _)
if !disabledSources.contains(ds.getClass.getCanonicalName) =>
v2ToRelationMap.getOrElseUpdate(s, {
// Materialize source to avoid creating it in every batch
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
nextSourceId += 1
logInfo(s"Reading table [$table] from DataSourceV2 named '$dsName' [$ds]")
// TODO: operator pushdown.
val scan = table.newScanBuilder(options).build()
val stream = scan.toMicroBatchStream(metadataPath)
StreamingDataSourceV2Relation(output, scan, stream)
})
case s @ StreamingRelationV2(ds, dsName, _, _, output, v1Relation) =>
v2ToExecutionRelationMap.getOrElseUpdate(s, {
// Materialize source to avoid creating it in every batch
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
if (v1Relation.isEmpty) {
throw new UnsupportedOperationException(
s"Data source $dsName does not support microbatch processing.")
}
val source = v1Relation.get.dataSource.createSource(metadataPath)
nextSourceId += 1
logInfo(s"Using Source [$source] from DataSourceV2 named '$dsName' [$ds]")
StreamingExecutionRelation(source, output)(sparkSession)
})

case s @ StreamingRelationV2(src, srcName, table: SupportsRead, options, output, v1) =>
val v2Disabled = disabledSources.contains(src.getClass.getCanonicalName)
if (!v2Disabled && table.supports(TableCapability.MICRO_BATCH_READ)) {
v2ToRelationMap.getOrElseUpdate(s, {
// Materialize source to avoid creating it in every batch
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
nextSourceId += 1
logInfo(s"Reading table [$table] from DataSourceV2 named '$srcName' [$src]")
// TODO: operator pushdown.
val scan = table.newScanBuilder(options).build()
val stream = scan.toMicroBatchStream(metadataPath)
StreamingDataSourceV2Relation(output, scan, stream)
})
} else if (v1.isEmpty) {
throw new UnsupportedOperationException(
s"Data source $srcName does not support microbatch processing.")
} else {
v2ToExecutionRelationMap.getOrElseUpdate(s, {
// Materialize source to avoid creating it in every batch
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
val source = v1.get.dataSource.createSource(metadataPath)
nextSourceId += 1
logInfo(s"Using Source [$source] from DataSourceV2 named '$srcName' [$src]")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this wasn't introduced by the current PR, but this shouldn't say DataSourceV2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we mention something about v1 and v2? Some sources have both v1 and v2 implementations and it might be helpful to have a log saying which implementation is actually used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be valuable to include whether v1 or v2 is used when the source supports both.

StreamingExecutionRelation(source, output)(sparkSession)
})
}
}
sources = _logicalPlan.collect {
// v1 source
Expand All @@ -123,7 +125,7 @@ class MicroBatchExecution(
uniqueSources = sources.distinct

sink match {
case s: SupportsStreamingWrite =>
case s: SupportsWrite =>
val streamingWrite = createStreamingWrite(s, extraOptions, _logicalPlan)
WriteToMicroBatchDataSource(streamingWrite, _logicalPlan)

Expand Down Expand Up @@ -519,7 +521,7 @@ class MicroBatchExecution(

val triggerLogicalPlan = sink match {
case _: Sink => newAttributePlan
case _: SupportsStreamingWrite =>
case _: SupportsWrite =>
newAttributePlan.asInstanceOf[WriteToMicroBatchDataSource].createPlan(currentBatchId)
case _ => throw new IllegalArgumentException(s"unknown sink type for $sink")
}
Expand Down Expand Up @@ -550,7 +552,7 @@ class MicroBatchExecution(
SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution) {
sink match {
case s: Sink => s.addBatch(currentBatchId, nextBatch)
case _: SupportsStreamingWrite =>
case _: SupportsWrite =>
// This doesn't accumulate any data - it just forces execution of the microbatch writer.
nextBatch.collect()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.StreamingExplainCommand
import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.SupportsStreamingWrite
import org.apache.spark.sql.sources.v2.SupportsWrite
import org.apache.spark.sql.sources.v2.writer.SupportsTruncate
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
import org.apache.spark.sql.streaming._
Expand Down Expand Up @@ -582,7 +582,7 @@ abstract class StreamExecution(
}

protected def createStreamingWrite(
table: SupportsStreamingWrite,
table: SupportsWrite,
options: Map[String, String],
inputPlan: LogicalPlan): StreamingWrite = {
val writeBuilder = table.newWriteBuilder(new CaseInsensitiveStringMap(options.asJava))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark.sql.execution.streaming

import java.util
import java.util.Collections

import scala.collection.JavaConverters._

import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming.sources.ConsoleWrite
Expand Down Expand Up @@ -60,13 +61,15 @@ class ConsoleSinkProvider extends TableProvider
def shortName(): String = "console"
}

object ConsoleTable extends Table with SupportsStreamingWrite {
object ConsoleTable extends Table with SupportsWrite with BaseStreamingSink {

override def name(): String = "console"

override def schema(): StructType = StructType(Nil)

override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
override def capabilities(): util.Set[TableCapability] = {
Set(TableCapability.STREAMING_WRITE).asJava
}

override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
new WriteBuilder with SupportsTruncate {
Expand Down
Loading