Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27190][SQL] add table capability for streaming #24129

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.kafka010

import java.{util => ju}
import java.util.{Collections, Locale, UUID}
import java.util.{Locale, UUID}

import scala.collection.JavaConverters._

Expand All @@ -29,9 +29,10 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
import org.apache.spark.internal.Logging
import org.apache.spark.kafka010.KafkaConfigUpdater
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.execution.streaming.{BaseStreamingSink, Sink, Source}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.TableCapability._
import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.sources.v2.writer.WriteBuilder
Expand Down Expand Up @@ -353,13 +354,15 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}

class KafkaTable(strategy: => ConsumerStrategy) extends Table
with SupportsMicroBatchRead with SupportsContinuousRead with SupportsStreamingWrite {
with SupportsRead with SupportsWrite with BaseStreamingSink {

override def name(): String = s"Kafka $strategy"

override def schema(): StructType = KafkaOffsetReader.kafkaSchema

override def capabilities(): ju.Set[TableCapability] = Collections.emptySet()
override def capabilities(): ju.Set[TableCapability] = {
Set(MICRO_BATCH_READ, CONTINUOUS_READ, STREAMING_WRITE).asJava
}

override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder =
() => new KafkaScan(options)
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ public enum TableCapability {
*/
BATCH_READ,

/**
* Signals that the table supports reads in micro-batch streaming execution mode.
*/
MICRO_BATCH_READ,

/**
* Signals that the table supports reads in continuous streaming execution mode.
*/
CONTINUOUS_READ,

/**
* Signals that the table supports append writes in batch execution mode.
* <p>
Expand All @@ -42,6 +52,15 @@ public enum TableCapability {
*/
BATCH_WRITE,

/**
* Signals that the table supports append writes in streaming execution mode.
* <p>
* Tables that return this capability must support appending data and may also support additional
* write modes, like {@link #TRUNCATE}, {@link #OVERWRITE_BY_FILTER}, and
* {@link #OVERWRITE_DYNAMIC}.
*/
STREAMING_WRITE,

/**
* Signals that the table can be truncated in a write operation.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousStream;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchStream;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.sources.v2.SupportsContinuousRead;
import org.apache.spark.sql.sources.v2.SupportsMicroBatchRead;
import org.apache.spark.sql.sources.v2.Table;
import org.apache.spark.sql.sources.v2.TableCapability;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this commit includes non-functional changes. These docs changes are good to have, but they are not related to streaming and there is no need for this file to change in this commit. I would prefer to revert this and include it in a docs-only PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same with the changes in WriteBuilder.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's related to streaming: https://github.com/apache/spark/pull/24129/files#diff-d111d7e2179b55465840c9a81ea004f2R76

The only unnecessary change is the doc for batch. I replace BATCH_READ with a java doc link, to be consistent with streaming.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadoc changes are non-functional changes. If you want to correct Javadoc, limit updates to files that are already changing in the PR.

As it is, this is a non-functional change in an unrelated file. The commit message is going to indicate this commit is related to streaming, so this could easily cause conflicts. Please remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't agree with this. I add doc for streaming and change the doc for batch a little bit to match the streaming doc style. It's very annoying to separate minor change like this. I do consider this change as self-contained.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are too many documentation-only changes for this PR to be committed. Please separate them into a different PR. A docs-only commit will be easy to review and easy to cherry-pick based on the description.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did I miss something here? The docs changes are totally related to the current PR (the doc would refer to deleted interface).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs changes that I was referring to were moved already, this was just a poorly placed comment.


Expand Down Expand Up @@ -74,8 +72,8 @@ default Batch toBatch() {
/**
* Returns the physical representation of this scan for streaming query with micro-batch mode. By
* default this method throws exception, data sources must overwrite this method to provide an
* implementation, if the {@link Table} that creates this scan implements
* {@link SupportsMicroBatchRead}.
* implementation, if the {@link Table} that creates this scan returns
* {@link TableCapability#MICRO_BATCH_READ} support in its {@link Table#capabilities()}.
*
* @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
* recovery. Data streams for the same logical source in the same query
Expand All @@ -90,8 +88,8 @@ default MicroBatchStream toMicroBatchStream(String checkpointLocation) {
/**
* Returns the physical representation of this scan for streaming query with continuous mode. By
* default this method throws exception, data sources must overwrite this method to provide an
* implementation, if the {@link Table} that creates this scan implements
* {@link SupportsContinuousRead}.
* implementation, if the {@link Table} that creates this scan returns
* {@link TableCapability#CONTINUOUS_READ} support in its {@link Table#capabilities()}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are non-functional changes and should not be in this commit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we move this? SupportsContinuousRead is removed in this PR and TableCapability#CONTINUOUS_READ is added in this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, sorry I missed the links that are removed. This change is fine.

*
* @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
* recovery. Data streams for the same logical source in the same query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.streaming.BaseStreamingSink
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.writer._
Expand All @@ -39,11 +40,13 @@ class NoopDataSource extends TableProvider with DataSourceRegister {
override def getTable(options: CaseInsensitiveStringMap): Table = NoopTable
}

private[noop] object NoopTable extends Table with SupportsWrite with SupportsStreamingWrite {
private[noop] object NoopTable extends Table with SupportsWrite with BaseStreamingSink {
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = NoopWriteBuilder
override def name(): String = "noop-table"
override def schema(): StructType = new StructType()
override def capabilities(): util.Set[TableCapability] = Set(TableCapability.BATCH_WRITE).asJava
override def capabilities(): util.Set[TableCapability] = {
Set(TableCapability.BATCH_WRITE, TableCapability.STREAMING_WRITE).asJava
}
}

private[noop] object NoopWriteBuilder extends WriteBuilder
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRelationV2}
import org.apache.spark.sql.sources.v2.TableCapability.{CONTINUOUS_READ, MICRO_BATCH_READ}

/**
* This rules adds some basic table capability check for streaming scan, without knowing the actual
* streaming execution mode.
*/
object V2StreamingScanSupportCheck extends (LogicalPlan => Unit) {
import DataSourceV2Implicits._

override def apply(plan: LogicalPlan): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these checks should also include a validation for individual tables. If a table doesn't support streaming at all, a more helpful error message is that a specific table doesn't support streaming, not just that there isn't a streaming mode that can handle all of the sources.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we add this check here, it will never be hit because we already checked it earlier in DataStreamReader. Like I explained, it's non-trivial to move the check from DataStreamReader to here, because it's coupled with the v2 -> v1 fallback logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think eventually we will have an analyzer rule that checks streaming scan capability and fallback to v1 if necessary. This checker rule is not suitable because it just traverses the plan, not returning a new plan. So we can't implement the fallback logic in this rule.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if it will never be hit, it should still be in the analyzer so that plans that are created through other paths in the future are still checked. The idea is to avoid relying on a particular way of creating a logical plan to validate logical plans.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's why I opened https://issues.apache.org/jira/browse/SPARK-27483

plans that are created through other APIs(not DataStreamReader) still need the fallback logic, which can not be done within this checker rule.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would this check depend on fallback logic? These checks run after resolution rules have reached a fixed point. If there is a streaming DSv2 relation in the plan, fallback should already be done. Fallback logic is separate and this check can be done here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would this check depend on fallback logic?

It's the opposite. The fallback logic depends on the check. That said, the future analyzer rule would do 3 things:

  1. find the v2 streaming relation in the plan, check scan capability
  2. if check failed, fallback to v1 relation, and use the v1 relation to replace the v2 streaming relation.
  3. if fallback is not applicable, fail

You can see that, it's not a simple check anymore, which can not be done within this simpler checker rule (LogicalPlan => Unit).

Copy link
Contributor

@rdblue rdblue Apr 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, let's get the validation in now.

I don't think that the fallback rule should be implemented as you describe. I think it should be done in 2 parts: the rule to fallback and update the plan, and this validation that all sources support streaming.

Spark should not combine transform rules and validations. There are a couple of reasons for this principle:

  1. Validations are used to ensure that the query is valid and to ensure that rules are run correctly. If the transform rule is added to the analyzer in a single-run batch, we want validation to catch that. These checks catch errors in Spark, too.
  2. Rules should be as small as possible and focused on a single task. The fallback rule should not fail analysis if it doesn't know what to do because some other rule may be added later that does. For example, what if we build an adapter from continuous execution to micro-batch execution for a source?

So we will need a validation rule either way. When the fallback rule runs and can't fix the problem, this check should be what fails the plan.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added.

plan.foreach {
case r: StreamingRelationV2 if !r.table.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ) =>
throw new AnalysisException(
s"Table ${r.table.name()} does not support either micro-batch or continuous scan.")
case _ =>
}

val streamingSources = plan.collect {
case r: StreamingRelationV2 => r.table
}
val v1StreamingRelations = plan.collect {
case r: StreamingRelation => r
}

if (streamingSources.length + v1StreamingRelations.length > 1) {
val allSupportsMicroBatch = streamingSources.forall(_.supports(MICRO_BATCH_READ))
// v1 streaming data source only supports micro-batch.
val allSupportsContinuous = streamingSources.forall(_.supports(CONTINUOUS_READ)) &&
v1StreamingRelations.isEmpty
if (!allSupportsMicroBatch && !allSupportsContinuous) {
val microBatchSources =
streamingSources.filter(_.supports(MICRO_BATCH_READ)).map(_.name()) ++
v1StreamingRelations.map(_.sourceName)
val continuousSources = streamingSources.filter(_.supports(CONTINUOUS_READ)).map(_.name())
throw new AnalysisException(
"The streaming sources in a query do not have a common supported execution mode.\n" +
"Sources support micro-batch: " + microBatchSources.mkString(", ") + "\n" +
"Sources support continuous: " + continuousSources.mkString(", "))
}
}
}
}
Loading