Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24630][SS] Support SQLStreaming in Spark #22575

Closed
wants to merge 9 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
providerName: String,
parameters: Map[String, String]): (String, StructType) = {
validateStreamOptions(parameters)
require(schema.isEmpty, "Kafka source has a fixed schema and cannot be set with a custom one")
if(schema.isDefined) {
jackylee-ch marked this conversation as resolved.
Show resolved Hide resolved
logError("Kafka source has a fixed schema and cannot be set with a custom one")
}
(shortName(), KafkaOffsetReader.kafkaSchema)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ querySpecification
(RECORDREADER recordReader=STRING)?
fromClause?
(WHERE where=booleanExpression)?)
| ((kind=SELECT (hints+=hint)* setQuantifier? namedExpressionSeq fromClause?
| ((kind=SELECT (stream=STREAM)? (hints+=hint)* setQuantifier? namedExpressionSeq fromClause?
| fromClause (kind=SELECT setQuantifier? namedExpressionSeq)?)
lateralView*
(WHERE where=booleanExpression)?
Expand Down Expand Up @@ -772,12 +772,13 @@ nonReserved
| NULL | ORDER | OUTER | TABLE | TRUE | WITH | RLIKE
| AND | CASE | CAST | DISTINCT | DIV | ELSE | END | FUNCTION | INTERVAL | MACRO | OR | STRATIFY | THEN
| UNBOUNDED | WHEN
| DATABASE | SELECT | FROM | WHERE | HAVING | TO | TABLE | WITH | NOT
| DATABASE | SELECT | STREAM | FROM | WHERE | HAVING | TO | TABLE | WITH | NOT
| DIRECTORY
| BOTH | LEADING | TRAILING
;

SELECT: 'SELECT';
STREAM: 'STREAM';
FROM: 'FROM';
ADD: 'ADD';
AS: 'AS';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,20 @@ case class UnresolvedRelation(tableIdentifier: TableIdentifier)
override lazy val resolved = false
}

/**
* Holds the name of a stream relation that has yet to be looked up in a catalog.
* @param tableIdentifier
*/
case class UnresolvedStreamRelation(tableIdentifier: TableIdentifier) extends LeafNode {

/** Returns a `.` separated name for this relation. */
def tableName: String = tableIdentifier.unquotedString

override def output: Seq[Attribute] = Nil

override lazy val resolved = false
}

/**
* An inline table that has not been resolved yet. Once resolved, it is turned by the analyzer into
* a [[org.apache.spark.sql.catalyst.plans.logical.LocalRelation]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@ case class CatalogTable(
StructType(partitionFields)
}

/** Return true if the table is stream table */
def isStreaming: Boolean = {
provider.isDefined && storage.properties.getOrElse("isStreaming", "false").toBoolean
}

/**
* schema of this table's data columns
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
* are defined as a number between 0 and 100.
* - TABLESAMPLE(BUCKET x OUT OF y): Sample the table down to a 'x' divided by 'y' fraction.
*/
private def withSample(ctx: SampleContext, query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
protected def withSample(ctx: SampleContext, query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
// Create a sampled plan if we need one.
def sample(fraction: Double): Sample = {
// The range of fraction accepted by Sample is [0, 1]. Because Hive's block sampling
Expand Down Expand Up @@ -889,7 +889,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
* If aliases specified in a FROM clause, create a subquery alias ([[SubqueryAlias]]) and
* column aliases for a [[LogicalPlan]].
*/
private def mayApplyAliasPlan(tableAlias: TableAliasContext, plan: LogicalPlan): LogicalPlan = {
protected def mayApplyAliasPlan(tableAlias: TableAliasContext, plan: LogicalPlan): LogicalPlan = {
if (tableAlias.strictIdentifier != null) {
val subquery = SubqueryAlias(tableAlias.strictIdentifier.getText, plan)
if (tableAlias.identifierList != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,10 @@ case class WithWindowDefinition(
override def output: Seq[Attribute] = child.output
}

case class WithStreamDefinition(child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}

/**
* @param order The ordering expressions
* @param global True means global sorting apply for entire data set,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,33 @@ object SQLConf {
.intConf
.createWithDefault(200)

val SQLSTREAM_OUTPUTMODE = buildConf("spark.sqlstreaming.outputMode")
jackylee-ch marked this conversation as resolved.
Show resolved Hide resolved
.doc("The output mode used in sqlstreaming")
.stringConf
.createWithDefault("append")

val SQLSTREAM_TRIGGER = buildConf("spark.sqlstreaming.trigger")
jackylee-ch marked this conversation as resolved.
Show resolved Hide resolved
.doc("The structstreaming trigger used in sqlstreaming")
.stringConf
.createWithDefault("0s")

val SQLSTREAM_QUERY_NAME = buildConf("spark.sqlstreaming.queryName")
.doc("The structstreaming query name used in sqlstreaming. " +
"User must use spark.sql.streaming.checkpointLocation and " +
"spark.sqlstreaming.queryName to ensure the unique checkpointLocation")
.stringConf
.createOptional

val SQLSTREAM_CONSOLE_OUTPUT_ROWS = buildConf("spark.sqlstreaming.console.numRows")
.doc("The num of rows showed to console sink in sqlstreaming")
.stringConf
.createWithDefault("20")

val SQLSTREAM_QUERY_ENABLE = buildConf("spark.sqlstreaming.query.enable")
.doc("Whether to enable use sqlstreaming in spark")
.booleanConf
.createWithDefault(false)

// This is used to set the default data source
val DEFAULT_DATA_SOURCE_NAME = buildConf("spark.sql.sources.default")
.doc("The default data source to use in input/output.")
Expand Down Expand Up @@ -1813,6 +1840,16 @@ class SQLConf extends Serializable with Logging {

def broadcastTimeout: Long = getConf(BROADCAST_TIMEOUT)

def sqlStreamOutputMode: String = getConf(SQLSTREAM_OUTPUTMODE)

def sqlStreamTrigger: String = getConf(SQLSTREAM_TRIGGER)

def sqlStreamQueryName: Option[String] = getConf(SQLSTREAM_QUERY_NAME)

def sqlStreamConsoleOutputRows: String = getConf(SQLSTREAM_CONSOLE_OUTPUT_ROWS)

def sqlStreamQueryEnable: Boolean = getConf(SQLSTREAM_QUERY_ENABLE)

def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME)

def convertCTAS: Boolean = getConf(CONVERT_CTAS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.antlr.v4.runtime.tree.TerminalNode

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedRelation, UnresolvedStreamRelation}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser._
Expand All @@ -40,7 +41,7 @@ import org.apache.spark.sql.types.StructType
* Concrete parser for Spark SQL statements.
*/
class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser {
val astBuilder = new SparkSqlAstBuilder(conf)
val astBuilder = new SparkSqlAstBuilderExt(conf)

private val substitutor = new VariableSubstitution(conf)

Expand All @@ -49,6 +50,61 @@ class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser {
}
}

/**
* Inheriting from SparkSqlAstBuilder.
* visitQuerySpecification and visitTableName are rewritten to
* handle syntax parsing of sqlstreaming.
*/
class SparkSqlAstBuilderExt(conf: SQLConf) extends SparkSqlAstBuilder(conf) {

import ParserUtils._

/**
* Mark if current sql contain stream query. This field should be set before visit table name,
* so that table will be parse to UnResolvedStreamRelation in Stream Query case.
*/
private var isStreamQuery = false

/**
* Create a logical plan using a query specification.
* Add WithStreamDefinition plan if isStreamQuery is true
* @param ctx the parse tree
*/
override def visitQuerySpecification(ctx: QuerySpecificationContext): LogicalPlan = {
isStreamQuery = isStreamQuery || (ctx.STREAM() != null)

val result = super.visitQuerySpecification(ctx)

// With Stream
withStreams(ctx, result)
}

/**
* Create an aliased table reference. This is typically used in FROM clauses.
* Define UnresolvedStreamRelation if isStreamQuery is true
* @param ctx the parse tree
*/
override def visitTableName(ctx: TableNameContext): LogicalPlan = withOrigin(ctx) {

val relaion = if (isStreamQuery) {
UnresolvedStreamRelation(visitTableIdentifier(ctx.tableIdentifier))
} else {
UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier))
}

val table = mayApplyAliasPlan(ctx.tableAlias, relaion)
table.optionalMap(ctx.sample)(withSample)
}

private def withStreams(ctx: QuerySpecificationContext, query: LogicalPlan): LogicalPlan = {
if (isStreamQuery) {
WithStreamDefinition(query)
} else {
query
}
}
}

/**
* Builder that converts an ANTLR ParseTree into a LogicalPlan/Expression/TableIdentifier.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.SparkSqlParser
import org.apache.spark.sql.execution.SparkSqlParserSuite
/**
* Used to check stream query
*/
class StreamingQuerySuite extends SparkSqlParserSuite {

import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._

private lazy val parser = new SparkSqlParser(newConf)

// change table to UnresolvedStreamRelation
def streamTable(ref: String): LogicalPlan = UnresolvedStreamRelation(TableIdentifier(ref))

private def assertEqual(sqlCommand: String, plan: LogicalPlan): Unit = {
val normalized1 = normalizePlan(parser.parsePlan(sqlCommand))
val normalized2 = normalizePlan(plan)
comparePlans(normalized1, normalized2)
}

test("simple stream query") {
assertEqual("select stream * from b",
WithStreamDefinition(streamTable("b").select(star())))
}

test("simple stream join batch") {
// stream join batch
assertEqual("select stream * from b join s",
WithStreamDefinition(
streamTable("b").join(streamTable("s"), Inner, None).select(star())
)
)
}

test("simple subquery(stream) join batch") {
// subquery(stream) join batch
assertEqual("select stream * from (select * from b) as c join s",
WithStreamDefinition(
WithStreamDefinition(streamTable("b").select(star())).as("c")
.join(streamTable("s"), Inner, None).select(star())
)
)
}

test("simple stream join subquery(batch)") {
// stream join subquery(batch)
assertEqual("select stream * from s join (select * from b) as c",
WithStreamDefinition(
streamTable("s").join(
WithStreamDefinition(streamTable("b").select(star())).as("c"),
Inner,
None).select(star())
)
)
}

test("simple subquery(stream) join subquery(batch)") {
// subquery(stream) join subquery(batch)
assertEqual("select stream * from (select * from s) as t join (select * from b) as c",
WithStreamDefinition(
WithStreamDefinition(streamTable("s").select(star())).as("t")
.join(WithStreamDefinition(streamTable("b").select(star())).as("c"), Inner, None)
.select(star())
)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// it to Hive. If it fails, treat it as not hive compatible and go back to 2.1
val tableProperties = tableMetaToTableProps(table)

if (table.isStreaming) {
tableProperties.put(DATASOURCE_STREAM_TABLE, "true")
}

// put table provider and partition provider in table properties.
tableProperties.put(DATASOURCE_PROVIDER, provider)
if (table.tracksPartitionsInCatalog) {
Expand Down Expand Up @@ -1313,6 +1317,7 @@ object HiveExternalCatalog {
val CREATED_SPARK_VERSION = SPARK_SQL_PREFIX + "create.version"

val HIVE_GENERATED_STORAGE_PROPERTIES = Set(SERIALIZATION_FORMAT)
val DATASOURCE_STREAM_TABLE = DATASOURCE_PREFIX + "isStreaming"

// When storing data source tables in hive metastore, we need to set data schema to empty if the
// schema is hive-incompatible. However we need a hack to preserve existing behavior. Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlanner
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.hive.execution.{ResolveStreamRelation, ValidSQLStreaming}
import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionResourceLoader, SessionState}

/**
Expand Down Expand Up @@ -68,7 +69,15 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
*/
override protected def analyzer: Analyzer = new Analyzer(catalog, conf) {
override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
new ResolveHiveSerdeTable(session) +:
/** Used to resolve UnResolvedStreamRelation, which is used in SQLstreaming */
new ResolveStreamRelation(catalog, conf, session) +:
/**
* Check whether sqlstreaming plan is valid and
* change InsertIntoTable to specific Stream Sink,
* or add Console Sink when user just select
*/
new ValidSQLStreaming(session, conf) +:
new ResolveHiveSerdeTable(session) +:
new FindDataSourceTable(session) +:
new ResolveSQLOnFile(session) +:
customResolutionRules
Expand Down
Loading