Skip to content

Commit

Permalink
[SPARK-27190][SQL] add table capability for streaming
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This is a followup of #24012 , to add the corresponding capabilities for streaming.

## How was this patch tested?

existing tests

Closes #24129 from cloud-fan/capability.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
cloud-fan committed Apr 26, 2019
1 parent 2234667 commit 85fd552
Show file tree
Hide file tree
Showing 24 changed files with 389 additions and 231 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.kafka010

import java.{util => ju}
import java.util.{Collections, Locale, UUID}
import java.util.{Locale, UUID}

import scala.collection.JavaConverters._

Expand All @@ -29,9 +29,10 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
import org.apache.spark.internal.Logging
import org.apache.spark.kafka010.KafkaConfigUpdater
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.execution.streaming.{BaseStreamingSink, Sink, Source}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.TableCapability._
import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.sources.v2.writer.WriteBuilder
Expand Down Expand Up @@ -353,13 +354,15 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}

class KafkaTable(strategy: => ConsumerStrategy) extends Table
with SupportsMicroBatchRead with SupportsContinuousRead with SupportsStreamingWrite {
with SupportsRead with SupportsWrite with BaseStreamingSink {

override def name(): String = s"Kafka $strategy"

override def schema(): StructType = KafkaOffsetReader.kafkaSchema

override def capabilities(): ju.Set[TableCapability] = Collections.emptySet()
override def capabilities(): ju.Set[TableCapability] = {
Set(MICRO_BATCH_READ, CONTINUOUS_READ, STREAMING_WRITE).asJava
}

override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder =
() => new KafkaScan(options)
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ public enum TableCapability {
*/
BATCH_READ,

/**
* Signals that the table supports reads in micro-batch streaming execution mode.
*/
MICRO_BATCH_READ,

/**
* Signals that the table supports reads in continuous streaming execution mode.
*/
CONTINUOUS_READ,

/**
* Signals that the table supports append writes in batch execution mode.
* <p>
Expand All @@ -42,6 +52,15 @@ public enum TableCapability {
*/
BATCH_WRITE,

/**
* Signals that the table supports append writes in streaming execution mode.
* <p>
* Tables that return this capability must support appending data and may also support additional
* write modes, like {@link #TRUNCATE}, {@link #OVERWRITE_BY_FILTER}, and
* {@link #OVERWRITE_DYNAMIC}.
*/
STREAMING_WRITE,

/**
* Signals that the table can be truncated in a write operation.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousStream;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchStream;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.sources.v2.SupportsContinuousRead;
import org.apache.spark.sql.sources.v2.SupportsMicroBatchRead;
import org.apache.spark.sql.sources.v2.Table;
import org.apache.spark.sql.sources.v2.TableCapability;

Expand Down Expand Up @@ -74,8 +72,8 @@ default Batch toBatch() {
/**
* Returns the physical representation of this scan for streaming query with micro-batch mode. By
* default this method throws exception, data sources must overwrite this method to provide an
* implementation, if the {@link Table} that creates this scan implements
* {@link SupportsMicroBatchRead}.
* implementation, if the {@link Table} that creates this scan returns
* {@link TableCapability#MICRO_BATCH_READ} support in its {@link Table#capabilities()}.
*
* @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
* recovery. Data streams for the same logical source in the same query
Expand All @@ -90,8 +88,8 @@ default MicroBatchStream toMicroBatchStream(String checkpointLocation) {
/**
* Returns the physical representation of this scan for streaming query with continuous mode. By
* default this method throws exception, data sources must overwrite this method to provide an
* implementation, if the {@link Table} that creates this scan implements
* {@link SupportsContinuousRead}.
* implementation, if the {@link Table} that creates this scan returns
* {@link TableCapability#CONTINUOUS_READ} support in its {@link Table#capabilities()}.
*
* @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
* recovery. Data streams for the same logical source in the same query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.streaming.BaseStreamingSink
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.writer._
Expand All @@ -39,11 +40,13 @@ class NoopDataSource extends TableProvider with DataSourceRegister {
override def getTable(options: CaseInsensitiveStringMap): Table = NoopTable
}

private[noop] object NoopTable extends Table with SupportsWrite with SupportsStreamingWrite {
private[noop] object NoopTable extends Table with SupportsWrite with BaseStreamingSink {
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = NoopWriteBuilder
override def name(): String = "noop-table"
override def schema(): StructType = new StructType()
override def capabilities(): util.Set[TableCapability] = Set(TableCapability.BATCH_WRITE).asJava
override def capabilities(): util.Set[TableCapability] = {
Set(TableCapability.BATCH_WRITE, TableCapability.STREAMING_WRITE).asJava
}
}

private[noop] object NoopWriteBuilder extends WriteBuilder
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRelationV2}
import org.apache.spark.sql.sources.v2.TableCapability.{CONTINUOUS_READ, MICRO_BATCH_READ}

/**
* This rules adds some basic table capability check for streaming scan, without knowing the actual
* streaming execution mode.
*/
object V2StreamingScanSupportCheck extends (LogicalPlan => Unit) {
import DataSourceV2Implicits._

override def apply(plan: LogicalPlan): Unit = {
plan.foreach {
case r: StreamingRelationV2 if !r.table.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ) =>
throw new AnalysisException(
s"Table ${r.table.name()} does not support either micro-batch or continuous scan.")
case _ =>
}

val streamingSources = plan.collect {
case r: StreamingRelationV2 => r.table
}
val v1StreamingRelations = plan.collect {
case r: StreamingRelation => r
}

if (streamingSources.length + v1StreamingRelations.length > 1) {
val allSupportsMicroBatch = streamingSources.forall(_.supports(MICRO_BATCH_READ))
// v1 streaming data source only supports micro-batch.
val allSupportsContinuous = streamingSources.forall(_.supports(CONTINUOUS_READ)) &&
v1StreamingRelations.isEmpty
if (!allSupportsMicroBatch && !allSupportsContinuous) {
val microBatchSources =
streamingSources.filter(_.supports(MICRO_BATCH_READ)).map(_.name()) ++
v1StreamingRelations.map(_.sourceName)
val continuousSources = streamingSources.filter(_.supports(CONTINUOUS_READ)).map(_.name())
throw new AnalysisException(
"The streaming sources in a query do not have a common supported execution mode.\n" +
"Sources support micro-batch: " + microBatchSources.mkString(", ") + "\n" +
"Sources support continuous: " + continuousSources.mkString(", "))
}
}
}
}
Loading

0 comments on commit 85fd552

Please sign in to comment.