From af26ea77157a7ff4e0a2c5eecec64c57f73c425d Mon Sep 17 00:00:00 2001 From: Jackey Lee Date: Fri, 28 Sep 2018 09:04:17 +0800 Subject: [PATCH 1/7] Support SQLStreaming in Spark: Add keyword 'STREAM'; Support create stream table --- .../spark/sql/catalyst/parser/SqlBase.g4 | 5 +- .../sql/catalyst/analysis/unresolved.scala | 14 +++ .../sql/catalyst/catalog/interface.scala | 5 + .../sql/catalyst/parser/AstBuilder.scala | 4 +- .../plans/logical/basicLogicalOperators.scala | 4 + .../spark/sql/execution/SparkSqlParser.scala | 58 +++++++++++- .../streaming/StreamingQuerySuite.scala | 91 +++++++++++++++++++ .../spark/sql/hive/HiveExternalCatalog.scala | 5 + .../sql/hive/StreamTableDDLCommandSuite.scala | 43 +++++++++ 9 files changed, 224 insertions(+), 5 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamingQuerySuite.scala create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/StreamTableDDLCommandSuite.scala diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 94283f59011a8..a1ea18404ba43 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -390,7 +390,7 @@ querySpecification (RECORDREADER recordReader=STRING)? fromClause? (WHERE where=booleanExpression)?) - | ((kind=SELECT (hints+=hint)* setQuantifier? namedExpressionSeq fromClause? + | ((kind=SELECT (stream=STREAM)? (hints+=hint)* setQuantifier? namedExpressionSeq fromClause? | fromClause (kind=SELECT setQuantifier? namedExpressionSeq)?) lateralView* (WHERE where=booleanExpression)? @@ -772,12 +772,13 @@ nonReserved | NULL | ORDER | OUTER | TABLE | TRUE | WITH | RLIKE | AND | CASE | CAST | DISTINCT | DIV | ELSE | END | FUNCTION | INTERVAL | MACRO | OR | STRATIFY | THEN | UNBOUNDED | WHEN - | DATABASE | SELECT | FROM | WHERE | HAVING | TO | TABLE | WITH | NOT + | DATABASE | SELECT | STREAM | FROM | WHERE | HAVING | TO | TABLE | WITH | NOT | DIRECTORY | BOTH | LEADING | TRAILING ; SELECT: 'SELECT'; +STREAM: 'STREAM'; FROM: 'FROM'; ADD: 'ADD'; AS: 'AS'; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index c1ec736c32ed4..7b005badc42fe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -51,6 +51,20 @@ case class UnresolvedRelation(tableIdentifier: TableIdentifier) override lazy val resolved = false } +/** + * Holds the name of a stream relation that has yet to be looked up in a catalog. + * @param tableIdentifier + */ +case class UnresolvedStreamRelation(tableIdentifier: TableIdentifier) extends LeafNode { + + /** Returns a `.` separated name for this relation. */ + def tableName: String = tableIdentifier.unquotedString + + override def output: Seq[Attribute] = Nil + + override lazy val resolved = false +} + /** * An inline table that has not been resolved yet. Once resolved, it is turned by the analyzer into * a [[org.apache.spark.sql.catalyst.plans.logical.LocalRelation]]. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 30ded13410f7c..52c84940b5731 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -258,6 +258,11 @@ case class CatalogTable( StructType(partitionFields) } + /** Return true if the table is stream table */ + def isStreaming: Boolean = { + provider.isDefined && storage.properties.getOrElse("isStreaming", "false").toBoolean + } + /** * schema of this table's data columns */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 5cfb5dc871041..a22ec584d34f5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -724,7 +724,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * are defined as a number between 0 and 100. * - TABLESAMPLE(BUCKET x OUT OF y): Sample the table down to a 'x' divided by 'y' fraction. */ - private def withSample(ctx: SampleContext, query: LogicalPlan): LogicalPlan = withOrigin(ctx) { + protected def withSample(ctx: SampleContext, query: LogicalPlan): LogicalPlan = withOrigin(ctx) { // Create a sampled plan if we need one. def sample(fraction: Double): Sample = { // The range of fraction accepted by Sample is [0, 1]. Because Hive's block sampling @@ -889,7 +889,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * If aliases specified in a FROM clause, create a subquery alias ([[SubqueryAlias]]) and * column aliases for a [[LogicalPlan]]. */ - private def mayApplyAliasPlan(tableAlias: TableAliasContext, plan: LogicalPlan): LogicalPlan = { + protected def mayApplyAliasPlan(tableAlias: TableAliasContext, plan: LogicalPlan): LogicalPlan = { if (tableAlias.strictIdentifier != null) { val subquery = SubqueryAlias(tableAlias.strictIdentifier.getText, plan) if (tableAlias.identifierList != null) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 7ff83a9be3622..b173e75bd935d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -498,6 +498,10 @@ case class WithWindowDefinition( override def output: Seq[Attribute] = child.output } +case class WithStreamDefinition(child: LogicalPlan) extends UnaryNode { + override def output: Seq[Attribute] = child.output +} + /** * @param order The ordering expressions * @param global True means global sorting apply for entire data set, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 89cb63784c0f4..24fd9cd6d37a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -26,6 +26,7 @@ import org.antlr.v4.runtime.tree.TerminalNode import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.{UnresolvedRelation, UnresolvedStreamRelation} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser._ @@ -40,7 +41,7 @@ import org.apache.spark.sql.types.StructType * Concrete parser for Spark SQL statements. */ class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser { - val astBuilder = new SparkSqlAstBuilder(conf) + val astBuilder = new SparkSqlAstBuilderExt(conf) private val substitutor = new VariableSubstitution(conf) @@ -49,6 +50,61 @@ class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser { } } +/** + * Inheriting from SparkSqlAstBuilder. + * visitQuerySpecification and visitTableName are rewritten to + * handle syntax parsing of sqlstreaming. + */ +class SparkSqlAstBuilderExt(conf: SQLConf) extends SparkSqlAstBuilder(conf) { + + import ParserUtils._ + + /** + * Mark if current sql contain stream query. This field should be set before visit table name, + * so that table will be parse to UnResolvedStreamRelation in Stream Query case. + */ + private var isStreamQuery = false + + /** + * Create a logical plan using a query specification. + * Add WithStreamDefinition plan if isStreamQuery is true + * @param ctx the parse tree + */ + override def visitQuerySpecification(ctx: QuerySpecificationContext): LogicalPlan = { + isStreamQuery = isStreamQuery || (ctx.STREAM() != null) + + val result = super.visitQuerySpecification(ctx) + + // With Stream + withStreams(ctx, result) + } + + /** + * Create an aliased table reference. This is typically used in FROM clauses. + * Define UnresolvedStreamRelation if isStreamQuery is true + * @param ctx the parse tree + */ + override def visitTableName(ctx: TableNameContext): LogicalPlan = withOrigin(ctx) { + + val relaion = if (isStreamQuery) { + UnresolvedStreamRelation(visitTableIdentifier(ctx.tableIdentifier)) + } else { + UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier)) + } + + val table = mayApplyAliasPlan(ctx.tableAlias, relaion) + table.optionalMap(ctx.sample)(withSample) + } + + private def withStreams(ctx: QuerySpecificationContext, query: LogicalPlan): LogicalPlan = { + if (isStreamQuery) { + WithStreamDefinition(query) + } else { + query + } + } +} + /** * Builder that converts an ANTLR ParseTree into a LogicalPlan/Expression/TableIdentifier. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamingQuerySuite.scala new file mode 100644 index 0000000000000..2665ef362d278 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamingQuerySuite.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.SparkSqlParser +import org.apache.spark.sql.execution.SparkSqlParserSuite +/** + * Used to check stream query + */ +class StreamingQuerySuite extends SparkSqlParserSuite { + + import org.apache.spark.sql.catalyst.dsl.expressions._ + import org.apache.spark.sql.catalyst.dsl.plans._ + + private lazy val parser = new SparkSqlParser(newConf) + + // change table to UnresolvedStreamRelation + def streamTable(ref: String): LogicalPlan = UnresolvedStreamRelation(TableIdentifier(ref)) + + private def assertEqual(sqlCommand: String, plan: LogicalPlan): Unit = { + val normalized1 = normalizePlan(parser.parsePlan(sqlCommand)) + val normalized2 = normalizePlan(plan) + comparePlans(normalized1, normalized2) + } + + test("simple stream query") { + assertEqual("select stream * from b", + WithStreamDefinition(streamTable("b").select(star()))) + } + + test("simple stream join batch") { + // stream join batch + assertEqual("select stream * from b join s", + WithStreamDefinition( + streamTable("b").join(streamTable("s"), Inner, None).select(star()) + ) + ) + } + + test("simple subquery(stream) join batch") { + // subquery(stream) join batch + assertEqual("select stream * from (select * from b) as c join s", + WithStreamDefinition( + WithStreamDefinition(streamTable("b").select(star())).as("c") + .join(streamTable("s"), Inner, None).select(star()) + ) + ) + } + + test("simple stream join subquery(batch)") { + // stream join subquery(batch) + assertEqual("select stream * from s join (select * from b) as c", + WithStreamDefinition( + streamTable("s").join( + WithStreamDefinition(streamTable("b").select(star())).as("c"), + Inner, + None).select(star()) + ) + ) + } + + test("simple subquery(stream) join subquery(batch)") { + // subquery(stream) join subquery(batch) + assertEqual("select stream * from (select * from s) as t join (select * from b) as c", + WithStreamDefinition( + WithStreamDefinition(streamTable("s").select(star())).as("t") + .join(WithStreamDefinition(streamTable("b").select(star())).as("c"), Inner, None) + .select(star()) + ) + ) + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 505124ae9e7c8..cecb1f221af0d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -296,6 +296,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // it to Hive. If it fails, treat it as not hive compatible and go back to 2.1 val tableProperties = tableMetaToTableProps(table) + if (table.isStreaming) { + tableProperties.put(DATASOURCE_STREAM_TABLE, "true") + } + // put table provider and partition provider in table properties. tableProperties.put(DATASOURCE_PROVIDER, provider) if (table.tracksPartitionsInCatalog) { @@ -1313,6 +1317,7 @@ object HiveExternalCatalog { val CREATED_SPARK_VERSION = SPARK_SQL_PREFIX + "create.version" val HIVE_GENERATED_STORAGE_PROPERTIES = Set(SERIALIZATION_FORMAT) + val DATASOURCE_STREAM_TABLE = DATASOURCE_PREFIX + "isStreaming" // When storing data source tables in hive metastore, we need to set data schema to empty if the // schema is hive-incompatible. However we need a hack to preserve existing behavior. Before diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StreamTableDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StreamTableDDLCommandSuite.scala new file mode 100644 index 0000000000000..e2421021c61e8 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StreamTableDDLCommandSuite.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils + +class StreamTableDDLCommandSuite extends SQLTestUtils with TestHiveSingleton { + private val catalog = spark.sessionState.catalog + + test("CTAS: create data source stream table") { + withTempPath { dir => + withTable("t") { + sql( + s"""CREATE TABLE t USING PARQUET + |OPTIONS ( + |PATH = '${dir.toURI}', + |location = '${dir.toURI}', + |isStreaming = 'true') + |AS SELECT 1 AS a, 2 AS b, 3 AS c + """.stripMargin + ) + val streamTable = catalog.getTableMetadata(TableIdentifier("t")) + assert(streamTable.isStreaming) + } + } + } +} From 36c68a1227796d250989bdbf7ebc5cde11906099 Mon Sep 17 00:00:00 2001 From: Jackey Lee Date: Fri, 28 Sep 2018 09:04:17 +0800 Subject: [PATCH 2/7] Support SQLStreaming in Spark: Add analysis rule for SQLStreaming --- .../sql/kafka010/KafkaSourceProvider.scala | 4 +- .../apache/spark/sql/internal/SQLConf.scala | 37 +++++ .../sql/hive/HiveSessionStateBuilder.scala | 11 +- .../execution/ResolveStreamRelation.scala | 148 ++++++++++++++++++ .../sql/hive/execution/SQLStreamingSink.scala | 115 ++++++++++++++ .../hive/execution/ValidSQLStreaming.scala | 117 ++++++++++++++ .../ResolveStreamRelationSuite.scala | 140 +++++++++++++++++ .../execution/ValidSQLStreamingSuite.scala | 92 +++++++++++ 8 files changed, 662 insertions(+), 2 deletions(-) create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ResolveStreamRelation.scala create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SQLStreamingSink.scala create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ValidSQLStreaming.scala create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ResolveStreamRelationSuite.scala create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ValidSQLStreamingSuite.scala diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 28c9853bfea9c..e783dd5b00f69 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -63,7 +63,9 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister providerName: String, parameters: Map[String, String]): (String, StructType) = { validateStreamOptions(parameters) - require(schema.isEmpty, "Kafka source has a fixed schema and cannot be set with a custom one") + if(schema.isDefined) { + logError("Kafka source has a fixed schema and cannot be set with a custom one") + } (shortName(), KafkaOffsetReader.kafkaSchema) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f6c98805bfb15..14e141b4f4745 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -631,6 +631,33 @@ object SQLConf { .intConf .createWithDefault(200) + val SQLSTREAM_OUTPUTMODE = buildConf("spark.sqlstreaming.outputMode") + .doc("The output mode used in sqlstreaming") + .stringConf + .createWithDefault("append") + + val SQLSTREAM_TRIGGER = buildConf("spark.sqlstreaming.trigger") + .doc("The structstreaming trigger used in sqlstreaming") + .stringConf + .createWithDefault("0s") + + val SQLSTREAM_QUERY_NAME = buildConf("spark.sqlstreaming.queryName") + .doc("The structstreaming query name used in sqlstreaming. " + + "User must use spark.sql.streaming.checkpointLocation and " + + "spark.sqlstreaming.queryName to ensure the unique checkpointLocation") + .stringConf + .createOptional + + val SQLSTREAM_CONSOLE_OUTPUT_ROWS = buildConf("spark.sqlstreaming.console.numRows") + .doc("The num of rows showed to console sink in sqlstreaming") + .stringConf + .createWithDefault("20") + + val SQLSTREAM_QUERY_ENABLE = buildConf("spark.sqlstreaming.query.enable") + .doc("Whether to enable use sqlstreaming in spark") + .booleanConf + .createWithDefault(false) + // This is used to set the default data source val DEFAULT_DATA_SOURCE_NAME = buildConf("spark.sql.sources.default") .doc("The default data source to use in input/output.") @@ -1813,6 +1840,16 @@ class SQLConf extends Serializable with Logging { def broadcastTimeout: Long = getConf(BROADCAST_TIMEOUT) + def sqlStreamOutputMode: String = getConf(SQLSTREAM_OUTPUTMODE) + + def sqlStreamTrigger: String = getConf(SQLSTREAM_TRIGGER) + + def sqlStreamQueryName: Option[String] = getConf(SQLSTREAM_QUERY_NAME) + + def sqlStreamConsoleOutputRows: String = getConf(SQLSTREAM_CONSOLE_OUTPUT_ROWS) + + def sqlStreamQueryEnable: Boolean = getConf(SQLSTREAM_QUERY_ENABLE) + def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME) def convertCTAS: Boolean = getConf(CONVERT_CTAS) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index 2882672f327c4..86c55fccc8a79 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.SparkPlanner import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.hive.execution.{ResolveStreamRelation, ValidSQLStreaming} import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionResourceLoader, SessionState} /** @@ -68,7 +69,15 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session */ override protected def analyzer: Analyzer = new Analyzer(catalog, conf) { override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = - new ResolveHiveSerdeTable(session) +: + /** Used to resolve UnResolvedStreamRelation, which is used in SQLstreaming */ + new ResolveStreamRelation(catalog, conf, session) +: + /** + * Check whether sqlstreaming plan is valid and + * change InsertIntoTable to specific Stream Sink, + * or add Console Sink when user just select + */ + new ValidSQLStreaming(session, conf) +: + new ResolveHiveSerdeTable(session) +: new FindDataSourceTable(session) +: new ResolveSQLOnFile(session) +: customResolutionRules diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ResolveStreamRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ResolveStreamRelation.scala new file mode 100644 index 0000000000000..3c826cbe9b7eb --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ResolveStreamRelation.scala @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.util.Locale + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.streaming.StreamingRelation +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.StreamSourceProvider +import org.apache.spark.sql.types.StructType + +/** + * Used to resolve UnResolvedStreamRelaTion, which is used in sqlstreaming + * Change UnResolvedStreamRelation to StreamingRelation if table is stream_table, + * otherwise, change UnResolvedStreamRelation to HiveTableRelation or other source Relation + * @param catalog + * @param conf + * @param sparkSession + */ +class ResolveStreamRelation(catalog: SessionCatalog, + conf: SQLConf, + sparkSession: SparkSession) + extends Rule[LogicalPlan] with CheckAnalysis { + + private def lookupRelation(relation: UnresolvedStreamRelation, + defaultDatabase: Option[String] = None): LogicalPlan = { + + val tableIdentWithDb = relation.tableIdentifier.copy( + database = relation.tableIdentifier.database.orElse(defaultDatabase)) + + try { + val dbName = tableIdentWithDb.database.getOrElse(catalog.getCurrentDatabase) + val db = formatDatabaseName(dbName) + val table = formatTableName(tableIdentWithDb.table) + val metadata = catalog.externalCatalog.getTable(db, table) + + if (metadata.isStreaming) { + lookupStreamingRelation(metadata) + } else { + catalog.lookupRelation(tableIdentWithDb) + } + } catch { + case _: NoSuchTableException => + relation.failAnalysis( + s"Stream Table or view not found: ${tableIdentWithDb.unquotedString}") + // If the database is defined and that database is not found, throw an AnalysisException. + // Note that if the database is not defined, it is possible we are looking up a temp view. + case e: NoSuchDatabaseException => + relation.failAnalysis(s"Stream Table or view not found: " + + s"${tableIdentWithDb.unquotedString}, the database ${e.db} doesn't exsits.") + } + } + + /** + * Format table name, taking into account case sensitivity. + */ + protected[this] def formatTableName(name: String): String = { + if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT) + } + + /** + * Format database name, taking into account case sensitivity. + */ + protected[this] def formatDatabaseName(name: String): String = { + if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT) + } + + /** + * Create StreamingRelation from table + * @param table + * @return + */ + private[hive] def lookupStreamingRelation(table: CatalogTable): LogicalPlan = { + table.provider match { + case Some(sourceName) => + DataSource.lookupDataSource(sourceName, conf).newInstance() match { + case s: StreamSourceProvider => + createStreamingRelation(table, usingFileStreamSource = false) + case format: FileFormat => + createStreamingRelation(table, usingFileStreamSource = true) + case _ => + throw new Exception(s"Cannot find Streaming Relation for provider $sourceName") + } + case _ => + throw new Exception("Invalid provider for Streaming Relation") + } + } + + /** + * Create StreamingRelation from table for SourceProvider + * @param table + * @return + */ + private def createStreamingRelation( + table: CatalogTable, + usingFileStreamSource: Boolean): LogicalPlan = { + + /** + * Get Source or Sink meta data from table. + */ + var sourceProperties = table.storage.properties + val partitionColumnNames = table.partitionColumnNames + val sourceName = table.provider.get + sourceProperties += ("source" -> sourceName) + var userSpecifiedSchema: Option[StructType] = Some(table.schema) + + if (usingFileStreamSource) { + sourceProperties += ("path" -> table.location.getPath) + } + + StreamingRelation( + DataSource( + sparkSession, + sourceName, + userSpecifiedSchema = userSpecifiedSchema, + options = sourceProperties, + partitionColumns = partitionColumnNames + ) + ) + } + + def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { + case u: UnresolvedStreamRelation => + val defaultDatabase = AnalysisContext.get.defaultDatabase + lookupRelation(u, defaultDatabase) + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SQLStreamingSink.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SQLStreamingSink.scala new file mode 100644 index 0000000000000..851acf6b5b99e --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SQLStreamingSink.scala @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.util.concurrent.TimeUnit + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes +import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils +import org.apache.spark.sql.sources.v2.StreamingWriteSupportProvider +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.util.Utils + +/** + * The basic RunnableCommand for SQLStreaming, using Command.run to start a streaming query. + * @param sparkSession + * @param extraOptions + * @param partitionColumnNames + * @param child + */ +case class SQLStreamingSink(sparkSession: SparkSession, + extraOptions: Map[String, String], + partitionColumnNames: Seq[String], + child: LogicalPlan) + extends RunnableCommand { + + private val sqlConf = sparkSession.sqlContext.conf + + /** + * The given column name may not be equal to any of the existing column names if we were in + * case-insensitive context. Normalize the given column name to the real one so that we don't + * need to care about case sensitivity afterwards. + */ + private def normalize(df: DataFrame, columnName: String, columnType: String): String = { + val validColumnNames = df.logicalPlan.output.map(_.name) + validColumnNames.find(sparkSession.sessionState.analyzer.resolver(_, columnName)) + .getOrElse(throw new AnalysisException(s"$columnType column $columnName not found in " + + s"existing columns (${validColumnNames.mkString(", ")})")) + } + + /** + * Parse spark.sqlstreaming.trigger.seconds to Trigger + */ + private def parseTrigger(): Trigger = { + val trigger = Utils.timeStringAsMs(sqlConf.sqlStreamTrigger) + Trigger.ProcessingTime(trigger, TimeUnit.MICROSECONDS) + } + + /** + * Running by queryExecution.executeCollect() + * @param sparkSession + * @return return empty rdds, save as DDLCommands + */ + override def run(sparkSession: SparkSession): Seq[Row] = { + + /////////////////////////////////////////////////////////////////////////////////////// + // Builder pattern config options + /////////////////////////////////////////////////////////////////////////////////////// + val df = Dataset.ofRows(sparkSession, child) + val source = extraOptions("source") + val outputMode = InternalOutputModes(sqlConf.sqlStreamOutputMode) + val normalizedParCols = partitionColumnNames.map { + normalize(df, _, "Partition") + } + + val ds = DataSource.lookupDataSource(source, sparkSession.sessionState.conf) + val disabledSources = sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",") + var options = extraOptions + val sink = ds.newInstance() match { + case w: StreamingWriteSupportProvider + if !disabledSources.contains(w.getClass.getCanonicalName) => + val sessionOptions = DataSourceV2Utils.extractSessionConfigs( + w, df.sparkSession.sessionState.conf) + options = sessionOptions ++ extraOptions + w + case _ => + val ds = DataSource( + df.sparkSession, + className = source, + options = options, + partitionColumns = normalizedParCols) + ds.createSink(outputMode) + } + + sparkSession.sessionState.streamingQueryManager.startQuery( + sqlConf.sqlStreamQueryName, + None, + df, + extraOptions, + sink, + outputMode, + trigger = parseTrigger() + ).awaitTermination() + + Seq.empty[Row] + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ValidSQLStreaming.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ValidSQLStreaming.scala new file mode 100644 index 0000000000000..a4c3920edeebf --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ValidSQLStreaming.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.CheckAnalysis +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.streaming.StreamingRelation +import org.apache.spark.sql.internal.SQLConf + +/** + * Check whether sqlstreaming is valid, call failAnalysis if failed + * @param sparkSession + */ +class ValidSQLStreaming(sparkSession: SparkSession, sqlConf: SQLConf) + extends Rule[LogicalPlan] with CheckAnalysis { + + /** + * Check whether logicalPlan is valid, if valid then create SQLStreamingSink. + * If user use InsertIntoTable, then use table meta to create SQLStreamingSink, + * if not, then create ConsoleSink to print result to Console + * @param tableMeta the CatalogTable for stream source + * @param child the logicalPlan of dataframe + * @return + */ + private def createSQLStreamingSink(tableMeta: Option[CatalogTable], + child: LogicalPlan): LogicalPlan = { + + checkSQLStreamingEnable() + + tableMeta match { + case Some(table) => + // Used when user define the sink meta + if (!table.isStreaming) { + failAnalysis(s"Not supported to insert write into " + + s"none stream table ${table.qualifiedName}") + } + val resolveStreamRelation = new ResolveStreamRelation(sparkSession.sessionState.catalog, + sqlConf, sparkSession) + val streamingRelation = resolveStreamRelation.lookupStreamingRelation(table) + val extraOptions = streamingRelation.asInstanceOf[StreamingRelation].dataSource.options + val partitionColumnNames = table.partitionColumnNames + SQLStreamingSink(sparkSession, extraOptions, partitionColumnNames, child) + + case None => + // Used when user use just select stream * from table. + var extraOptions = Map("numRows" -> sqlConf.sqlStreamConsoleOutputRows) + extraOptions += ("source" -> "console") + val partitionColumnNames = Seq() + SQLStreamingSink(sparkSession, extraOptions, partitionColumnNames, child) + } + } + + /** + * Remove unused WithStreamDefinition when use alias query + * @param plan the logicalPlan of dataframe + * @return logicalPlan + */ + private def removeUnusedWithStreamDefinition(plan: LogicalPlan): LogicalPlan = + plan.resolveOperators { + case logical: WithStreamDefinition => + logical.child + } + + /** + * Default Disable SQLStreaming. User must set true to run SQLStreaming + */ + private def checkSQLStreamingEnable(): Unit = { + if (!sqlConf.sqlStreamQueryEnable) { + failAnalysis("Disable SQLStreaming for default. If you want to use SQLSteaming, " + + s"set ${SQLConf.SQLSTREAM_QUERY_ENABLE.key}=true") + } + } + + /** + * Insert overwrite directory are not supported now, + * thus this program doesn't check valid insert overwrite directory + * @param plan + * @return + */ + override def apply(plan: LogicalPlan): LogicalPlan = { + if (!plan.analyzed && plan.isStreaming) { + plan match { + + case InsertIntoTable(LogicalRelation(_, _, Some(tableMeta), _), _, child, _, _) + if child.resolved => + createSQLStreamingSink(Some(tableMeta), removeUnusedWithStreamDefinition(child)) + + case WithStreamDefinition(child) if child.resolved => + createSQLStreamingSink(None, removeUnusedWithStreamDefinition(child)) + + case _ => + plan + } + } else { + plan + } + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ResolveStreamRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ResolveStreamRelationSuite.scala new file mode 100644 index 0000000000000..56dbb4809ec1d --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ResolveStreamRelationSuite.scala @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.net.URI +import java.nio.file._ + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.streaming.StreamingRelation +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ + +/** + * Test whether UnresolvedStreamRelation can be transformed tp StreamRelation or LogicalRelation + */ +class ResolveStreamRelationSuite extends AnalysisTest with TestHiveSingleton { + + protected val sqlConf = hiveContext.conf + + private val tempPath: String = { + val temp = Paths.get("/tmp/somewhere") + if (!Files.exists(temp)) { + Files.createDirectory(temp) + } + temp.toString + } + + protected val csvOptions: Map[String, String] = Map( + "isStreaming" -> "true", + "source" -> "csv", + "path" -> tempPath, + "sep" -> "\t" + ) + + protected val parquetOptions: Map[String, String] = Map( + "isStreaming" -> "true", + "source" -> "parquet", + "path" -> tempPath + ) + + // the basic table schema for tests + protected val tableSchema: StructType = StructType(Seq( + StructField("key", BinaryType), + StructField("value", BinaryType), + StructField("topic", StringType), + StructField("partition", IntegerType), + StructField("offset", LongType), + StructField("timestamp", TimestampType), + StructField("timestampType", IntegerType) + )) + + /** + * Generate Catalog table + * @param tableName + * @param sourceName + * @param options + * @return + */ + protected def getCatalogTable(tableName: String, + sourceName: String, + options: Map[String, String]): CatalogTable = { + val storage = DataSource.buildStorageFormatFromOptions(options) + CatalogTable( + identifier = TableIdentifier(tableName, Some("default")), + tableType = CatalogTableType.MANAGED, + provider = Some(sourceName), + storage = storage, + schema = tableSchema, + partitionColumnNames = Nil) + } + + protected val prepareTables: Seq[CatalogTable] = Seq( + getCatalogTable("csvTable", "csv", csvOptions), + getCatalogTable("parquetTable", "parquet", parquetOptions) + ) + + // overwrite this method in order to change sqlConf in tests + override protected def getAnalyzer(caseSensitive: Boolean) = { + val conf = sqlConf.copy(SQLConf.CASE_SENSITIVE -> caseSensitive) + val catalog = new SessionCatalog(new InMemoryCatalog, FunctionRegistry.builtin, conf) + catalog.createDatabase( + CatalogDatabase("default", "", new URI("loc"), Map.empty), + ignoreIfExists = false) + prepareTables.foreach{ table => + catalog.createTable(table, ignoreIfExists = true) + } + new Analyzer(catalog, conf) { + override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = + new ResolveStreamRelation(catalog, conf, spark) :: + new ValidSQLStreaming(spark, conf) :: + new FindDataSourceTable(spark) :: Nil + } + } + + protected def getStreamRelation( + sourceName: String, + options: Map[String, String]): StreamingRelation = + StreamingRelation( + DataSource( + sparkSession = spark, + className = sourceName, + options = options, + userSpecifiedSchema = Some(tableSchema), + partitionColumns = Seq() + ) + ) + + test("resolve stream relations") { + assertAnalysisError(UnresolvedStreamRelation(TableIdentifier("tAbLe")), Seq()) + checkAnalysis( + UnresolvedStreamRelation(TableIdentifier("csvTable")), + getStreamRelation("csv", csvOptions) + ) + checkAnalysis( + UnresolvedStreamRelation(TableIdentifier("parquetTable")), + getStreamRelation("parquet", parquetOptions) + ) + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ValidSQLStreamingSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ValidSQLStreamingSuite.scala new file mode 100644 index 0000000000000..49d6d1b5501ac --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ValidSQLStreamingSuite.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedStreamRelation +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, WithStreamDefinition} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.internal.SQLConf + +class ValidSQLStreamingSuite extends ResolveStreamRelationSuite { + + import org.apache.spark.sql.catalyst.dsl.expressions._ + import org.apache.spark.sql.catalyst.dsl.plans._ + + private val consoleSinkOptions = Map( + "numRows" -> sqlConf.sqlStreamConsoleOutputRows, + "source" -> "console" + ) + + test("select stream * from csvTable with enable=false") { + sqlConf.setConf(SQLConf.SQLSTREAM_QUERY_ENABLE, false) + assertAnalysisError( + WithStreamDefinition(getStreamRelation("csv", csvOptions).select(star())), + "Disable SQLStreaming for default." :: Nil + ) + } + + test("select stream * from csvTable with enable=true") { + sqlConf.setConf(SQLConf.SQLSTREAM_QUERY_ENABLE, true) + assertAnalysisSuccess( + WithStreamDefinition( + UnresolvedStreamRelation(TableIdentifier("csvTable")) + ) + ) + } + + test("insert into csvTable select stream * from parquetTable") { + sqlConf.setConf(SQLConf.SQLSTREAM_QUERY_ENABLE, true) + assertAnalysisSuccess( + InsertIntoTable( + LogicalRelation( + null, + Seq(), + Some(getCatalogTable("csvTable", "csv", csvOptions)), + isStreaming = false + ), + Map(), + WithStreamDefinition( + UnresolvedStreamRelation(TableIdentifier("parquetTable")) + ), + overwrite = false, + ifPartitionNotExists = false + ) + ) + } + + test("insert into csvTable select stream * from parquetTable with enable=true") { + sqlConf.setConf(SQLConf.SQLSTREAM_QUERY_ENABLE, true) + assertAnalysisError( + InsertIntoTable( + LogicalRelation( + null, + Seq(), + Some(getCatalogTable("csvTable", "csv", csvOptions.filterNot(_._1 == "isStreaming"))), + isStreaming = false + ), + Map(), + WithStreamDefinition( + UnresolvedStreamRelation(TableIdentifier("parquetTable")) + ), + overwrite = false, + ifPartitionNotExists = false + ), + "Not supported to insert write into none stream table" :: Nil + ) + } +} From fba7bb442c04932d5492870a601a75293a49decb Mon Sep 17 00:00:00 2001 From: Jackey Lee Date: Sun, 21 Oct 2018 18:35:28 +0800 Subject: [PATCH 3/7] [SPARK-24630][SS] Support SQLStreaming in Spark: add watermark to SQLStreaming --- .../sql/kafka010/KafkaSourceProvider.scala | 4 +- .../apache/spark/sql/internal/SQLConf.scala | 14 +++++ .../execution/ResolveStreamRelation.scala | 62 ++++++++++++++++--- .../sql/hive/execution/SQLStreamingSink.scala | 3 + .../hive/execution/ValidSQLStreaming.scala | 30 ++++++++- .../sql/hive/StreamTableDDLCommandSuite.scala | 9 ++- .../ResolveStreamRelationSuite.scala | 19 +++++- .../execution/ValidSQLStreamingSuite.scala | 1 + 8 files changed, 122 insertions(+), 20 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index e783dd5b00f69..28c9853bfea9c 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -63,9 +63,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister providerName: String, parameters: Map[String, String]): (String, StructType) = { validateStreamOptions(parameters) - if(schema.isDefined) { - logError("Kafka source has a fixed schema and cannot be set with a custom one") - } + require(schema.isEmpty, "Kafka source has a fixed schema and cannot be set with a custom one") (shortName(), KafkaOffsetReader.kafkaSchema) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 14e141b4f4745..924be57cb48d9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -631,6 +631,16 @@ object SQLConf { .intConf .createWithDefault(200) + val SQLSTREAM_WATERMARK_ENABLE = buildConf("spark.sqlstreaming.watermark.enable") + .doc("Whether use watermark in sqlstreaming.") + .booleanConf + .createWithDefault(false) + + val SQLSTREAM_CONSOLESINK_ENABLE = buildConf("spark.sqlstreaming.consoleSink.enable") + .doc("Whether use console sink when user does not define insert into table.") + .booleanConf + .createWithDefault(false) + val SQLSTREAM_OUTPUTMODE = buildConf("spark.sqlstreaming.outputMode") .doc("The output mode used in sqlstreaming") .stringConf @@ -1840,6 +1850,10 @@ class SQLConf extends Serializable with Logging { def broadcastTimeout: Long = getConf(BROADCAST_TIMEOUT) + def sqlStreamWaterMarkEnable: Boolean = getConf(SQLSTREAM_WATERMARK_ENABLE) + + def sqlStreamConsoleSinkEnable: Boolean = getConf(SQLSTREAM_CONSOLESINK_ENABLE) + def sqlStreamOutputMode: String = getConf(SQLSTREAM_OUTPUTMODE) def sqlStreamTrigger: String = getConf(SQLSTREAM_TRIGGER) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ResolveStreamRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ResolveStreamRelation.scala index 3c826cbe9b7eb..c2f9f46592cfd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ResolveStreamRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ResolveStreamRelation.scala @@ -21,14 +21,15 @@ import java.util.Locale import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog, UnresolvedCatalogRelation} +import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.streaming.StreamingRelation import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.StreamSourceProvider import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.CalendarInterval /** * Used to resolve UnResolvedStreamRelaTion, which is used in sqlstreaming @@ -39,10 +40,18 @@ import org.apache.spark.sql.types.StructType * @param sparkSession */ class ResolveStreamRelation(catalog: SessionCatalog, - conf: SQLConf, - sparkSession: SparkSession) + sqlConf: SQLConf, + sparkSession: SparkSession) extends Rule[LogicalPlan] with CheckAnalysis { + /** + * SQLStreaming watermark configuration. + * User must use spark.sqlstreaming.watermark.{db}.{table}.[column/delay] to set watermark. + */ + private val SQLSTREAM_WATERMARK = "spark.sqlstreaming.watermark" + private val COLUMN = "column" + private val DELAY = "delay" + private def lookupRelation(relation: UnresolvedStreamRelation, defaultDatabase: Option[String] = None): LogicalPlan = { @@ -76,14 +85,14 @@ class ResolveStreamRelation(catalog: SessionCatalog, * Format table name, taking into account case sensitivity. */ protected[this] def formatTableName(name: String): String = { - if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT) + if (sqlConf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT) } /** * Format database name, taking into account case sensitivity. */ protected[this] def formatDatabaseName(name: String): String = { - if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT) + if (sqlConf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT) } /** @@ -94,7 +103,7 @@ class ResolveStreamRelation(catalog: SessionCatalog, private[hive] def lookupStreamingRelation(table: CatalogTable): LogicalPlan = { table.provider match { case Some(sourceName) => - DataSource.lookupDataSource(sourceName, conf).newInstance() match { + DataSource.lookupDataSource(sourceName, sqlConf).newInstance() match { case s: StreamSourceProvider => createStreamingRelation(table, usingFileStreamSource = false) case format: FileFormat => @@ -129,15 +138,50 @@ class ResolveStreamRelation(catalog: SessionCatalog, sourceProperties += ("path" -> table.location.getPath) } - StreamingRelation( + val relation = StreamingRelation( DataSource( sparkSession, sourceName, userSpecifiedSchema = userSpecifiedSchema, options = sourceProperties, partitionColumns = partitionColumnNames - ) + ), + sourceName, + table.schema.toAttributes ) + + /** + * Check watermark + */ + withWaterMark(relation, table) + } + + /** + * Check watermark enable. If true, add watermark to relation. + * @param relation the basic streaming relation + * @param metadata table meta + * @return + */ + private def withWaterMark(relation: LogicalPlan, metadata: CatalogTable): LogicalPlan = { + if (sqlConf.sqlStreamWaterMarkEnable) { + + logInfo("Using watermark in sqlstreaming") + val tableName = s"${metadata.identifier.database.get}.${metadata.identifier.table}" + val SQLSTREAM_WATERMARK_COLUMN = s"$SQLSTREAM_WATERMARK.$tableName.$COLUMN" + val SQLSTREAM_WATERMARK_DELAY = s"$SQLSTREAM_WATERMARK.$tableName.$DELAY" + val column = sqlConf.getConfString(SQLSTREAM_WATERMARK_COLUMN) + val delay = sqlConf.getConfString(SQLSTREAM_WATERMARK_DELAY) + + EventTimeWatermark( + UnresolvedAttribute(column), + CalendarInterval.fromString(s"interval $delay"), + relation + ) + } else { + + logInfo("None watermark found in sqlstreaming") + relation + } } def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SQLStreamingSink.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SQLStreamingSink.scala index 851acf6b5b99e..6909171b7925d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SQLStreamingSink.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SQLStreamingSink.scala @@ -20,11 +20,13 @@ package org.apache.spark.sql.hive.execution import java.util.concurrent.TimeUnit import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.streaming.InternalOutputModes import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2.StreamingWriteSupportProvider import org.apache.spark.sql.streaming.Trigger import org.apache.spark.util.Utils @@ -75,6 +77,7 @@ case class SQLStreamingSink(sparkSession: SparkSession, // Builder pattern config options /////////////////////////////////////////////////////////////////////////////////////// val df = Dataset.ofRows(sparkSession, child) +// val df = Dataset.ofRows(sparkSession, child) val source = extraOptions("source") val outputMode = InternalOutputModes(sqlConf.sqlStreamOutputMode) val normalizedParCols = partitionColumnNames.map { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ValidSQLStreaming.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ValidSQLStreaming.scala index a4c3920edeebf..9e4693b5d2823 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ValidSQLStreaming.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ValidSQLStreaming.scala @@ -60,12 +60,38 @@ class ValidSQLStreaming(sparkSession: SparkSession, sqlConf: SQLConf) val partitionColumnNames = table.partitionColumnNames SQLStreamingSink(sparkSession, extraOptions, partitionColumnNames, child) - case None => + case None if sqlConf.sqlStreamConsoleSinkEnable => + logInfo("Enable to use ConsoleSink") // Used when user use just select stream * from table. var extraOptions = Map("numRows" -> sqlConf.sqlStreamConsoleOutputRows) extraOptions += ("source" -> "console") val partitionColumnNames = Seq() SQLStreamingSink(sparkSession, extraOptions, partitionColumnNames, child) + + // Sink_table is not defined, and user disable console sink + // The purpose of this part of the design is to keep synchronous with the design of + // offline Spark-sql, allowing users to submit streaming in "code + SQL", which is, + // users can generate df by querying stream_table and then customize the sink output. + // here is a little example + // val kafkaDF = + // spark.sql("select stream cast(key as string), cast(value as string) from kafka_sql_test") + // kafkaDF.writeStream + // .foreach(new ForeachWriter[Row] { + // override def process(value: Row) = { + // println(value.toSeq(1)) + // } + // override def close(errorOrNull: Throwable) = { + // } + // override def open(partitionId: Long, version: Long) = { + // true + // } + // }) + // .trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS)) + // .start() + // .awaitTermination() + case _ => + logWarning("No sink available in sqlstreaming, user should define StructStreaming Sink") + child } } @@ -75,7 +101,7 @@ class ValidSQLStreaming(sparkSession: SparkSession, sqlConf: SQLConf) * @return logicalPlan */ private def removeUnusedWithStreamDefinition(plan: LogicalPlan): LogicalPlan = - plan.resolveOperators { + plan.resolveOperators { case logical: WithStreamDefinition => logical.child } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StreamTableDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StreamTableDDLCommandSuite.scala index e2421021c61e8..39299d2154548 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StreamTableDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StreamTableDDLCommandSuite.scala @@ -30,11 +30,10 @@ class StreamTableDDLCommandSuite extends SQLTestUtils with TestHiveSingleton { s"""CREATE TABLE t USING PARQUET |OPTIONS ( |PATH = '${dir.toURI}', - |location = '${dir.toURI}', - |isStreaming = 'true') - |AS SELECT 1 AS a, 2 AS b, 3 AS c - """.stripMargin - ) + |location = '${dir.toURI}', + |isStreaming = 'true') + |AS SELECT 1 AS a, 2 AS b, 3 AS c + """.stripMargin) val streamTable = catalog.getTableMetadata(TableIdentifier("t")) assert(streamTable.isStreaming) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ResolveStreamRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ResolveStreamRelationSuite.scala index 56dbb4809ec1d..80233781147ce 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ResolveStreamRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ResolveStreamRelationSuite.scala @@ -126,15 +126,32 @@ class ResolveStreamRelationSuite extends AnalysisTest with TestHiveSingleton { ) ) - test("resolve stream relations") { + test("resolve stream relations with manage table") { assertAnalysisError(UnresolvedStreamRelation(TableIdentifier("tAbLe")), Seq()) + } + + test("resolve stream relations with csv stream tabl") { checkAnalysis( UnresolvedStreamRelation(TableIdentifier("csvTable")), getStreamRelation("csv", csvOptions) ) + } + + test("resolve stream relations with parquet stream table") { checkAnalysis( UnresolvedStreamRelation(TableIdentifier("parquetTable")), getStreamRelation("parquet", parquetOptions) ) } + + test("resolve stream relations with watermark") { + val column: String = "timestamp" + val delay: String = "2 seconds" + sqlConf.setConf(SQLConf.SQLSTREAM_WATERMARK_ENABLE, true) + sqlConf.setConfString("spark.sqlstreaming.watermark.default.csvTable.column", column) + sqlConf.setConfString("spark.sqlstreaming.watermark.default.csvTable.delay", delay) + assertAnalysisSuccess( + UnresolvedStreamRelation(TableIdentifier("csvTable")) + ) + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ValidSQLStreamingSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ValidSQLStreamingSuite.scala index 49d6d1b5501ac..0e68aca5ebb7b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ValidSQLStreamingSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ValidSQLStreamingSuite.scala @@ -42,6 +42,7 @@ class ValidSQLStreamingSuite extends ResolveStreamRelationSuite { test("select stream * from csvTable with enable=true") { sqlConf.setConf(SQLConf.SQLSTREAM_QUERY_ENABLE, true) + sqlConf.setConf(SQLConf.SQLSTREAM_CONSOLESINK_ENABLE, true) assertAnalysisSuccess( WithStreamDefinition( UnresolvedStreamRelation(TableIdentifier("csvTable")) From a741185c97af3275aead752a96cead126be2b0c2 Mon Sep 17 00:00:00 2001 From: Jackey Lee Date: Fri, 26 Oct 2018 09:03:58 +0800 Subject: [PATCH 4/7] [SPARK-24630][SS] Support SQLStreaming in Spark: remove stream keyword --- .../spark/sql/catalyst/parser/SqlBase.g4 | 5 +- .../sql/catalyst/analysis/unresolved.scala | 14 -- .../sql/catalyst/parser/AstBuilder.scala | 4 +- .../plans/logical/basicLogicalOperators.scala | 4 - .../apache/spark/sql/internal/SQLConf.scala | 14 -- .../spark/sql/execution/SparkSqlParser.scala | 59 +----- .../spark/sql/execution/command/ddl.scala | 4 + .../datasources/DataSourceStrategy.scala | 47 ++++- .../streaming}/SQLStreamingSink.scala | 25 +-- .../streaming/StreamingQuerySuite.scala | 91 --------- .../sql/hive/HiveSessionStateBuilder.scala | 9 - .../execution/ResolveStreamRelation.scala | 192 ------------------ .../hive/execution/ValidSQLStreaming.scala | 143 ------------- .../ResolveStreamRelationSuite.scala | 157 -------------- .../execution/ValidSQLStreamingSuite.scala | 93 --------- 15 files changed, 65 insertions(+), 796 deletions(-) rename sql/{hive/src/main/scala/org/apache/spark/sql/hive/execution => core/src/main/scala/org/apache/spark/sql/execution/streaming}/SQLStreamingSink.scala (85%) delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamingQuerySuite.scala delete mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ResolveStreamRelation.scala delete mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ValidSQLStreaming.scala delete mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ResolveStreamRelationSuite.scala delete mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ValidSQLStreamingSuite.scala diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index a1ea18404ba43..94283f59011a8 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -390,7 +390,7 @@ querySpecification (RECORDREADER recordReader=STRING)? fromClause? (WHERE where=booleanExpression)?) - | ((kind=SELECT (stream=STREAM)? (hints+=hint)* setQuantifier? namedExpressionSeq fromClause? + | ((kind=SELECT (hints+=hint)* setQuantifier? namedExpressionSeq fromClause? | fromClause (kind=SELECT setQuantifier? namedExpressionSeq)?) lateralView* (WHERE where=booleanExpression)? @@ -772,13 +772,12 @@ nonReserved | NULL | ORDER | OUTER | TABLE | TRUE | WITH | RLIKE | AND | CASE | CAST | DISTINCT | DIV | ELSE | END | FUNCTION | INTERVAL | MACRO | OR | STRATIFY | THEN | UNBOUNDED | WHEN - | DATABASE | SELECT | STREAM | FROM | WHERE | HAVING | TO | TABLE | WITH | NOT + | DATABASE | SELECT | FROM | WHERE | HAVING | TO | TABLE | WITH | NOT | DIRECTORY | BOTH | LEADING | TRAILING ; SELECT: 'SELECT'; -STREAM: 'STREAM'; FROM: 'FROM'; ADD: 'ADD'; AS: 'AS'; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 7b005badc42fe..c1ec736c32ed4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -51,20 +51,6 @@ case class UnresolvedRelation(tableIdentifier: TableIdentifier) override lazy val resolved = false } -/** - * Holds the name of a stream relation that has yet to be looked up in a catalog. - * @param tableIdentifier - */ -case class UnresolvedStreamRelation(tableIdentifier: TableIdentifier) extends LeafNode { - - /** Returns a `.` separated name for this relation. */ - def tableName: String = tableIdentifier.unquotedString - - override def output: Seq[Attribute] = Nil - - override lazy val resolved = false -} - /** * An inline table that has not been resolved yet. Once resolved, it is turned by the analyzer into * a [[org.apache.spark.sql.catalyst.plans.logical.LocalRelation]]. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index a22ec584d34f5..5cfb5dc871041 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -724,7 +724,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * are defined as a number between 0 and 100. * - TABLESAMPLE(BUCKET x OUT OF y): Sample the table down to a 'x' divided by 'y' fraction. */ - protected def withSample(ctx: SampleContext, query: LogicalPlan): LogicalPlan = withOrigin(ctx) { + private def withSample(ctx: SampleContext, query: LogicalPlan): LogicalPlan = withOrigin(ctx) { // Create a sampled plan if we need one. def sample(fraction: Double): Sample = { // The range of fraction accepted by Sample is [0, 1]. Because Hive's block sampling @@ -889,7 +889,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * If aliases specified in a FROM clause, create a subquery alias ([[SubqueryAlias]]) and * column aliases for a [[LogicalPlan]]. */ - protected def mayApplyAliasPlan(tableAlias: TableAliasContext, plan: LogicalPlan): LogicalPlan = { + private def mayApplyAliasPlan(tableAlias: TableAliasContext, plan: LogicalPlan): LogicalPlan = { if (tableAlias.strictIdentifier != null) { val subquery = SubqueryAlias(tableAlias.strictIdentifier.getText, plan) if (tableAlias.identifierList != null) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index b173e75bd935d..7ff83a9be3622 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -498,10 +498,6 @@ case class WithWindowDefinition( override def output: Seq[Attribute] = child.output } -case class WithStreamDefinition(child: LogicalPlan) extends UnaryNode { - override def output: Seq[Attribute] = child.output -} - /** * @param order The ordering expressions * @param global True means global sorting apply for entire data set, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 924be57cb48d9..89d02567bc4f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -636,11 +636,6 @@ object SQLConf { .booleanConf .createWithDefault(false) - val SQLSTREAM_CONSOLESINK_ENABLE = buildConf("spark.sqlstreaming.consoleSink.enable") - .doc("Whether use console sink when user does not define insert into table.") - .booleanConf - .createWithDefault(false) - val SQLSTREAM_OUTPUTMODE = buildConf("spark.sqlstreaming.outputMode") .doc("The output mode used in sqlstreaming") .stringConf @@ -658,11 +653,6 @@ object SQLConf { .stringConf .createOptional - val SQLSTREAM_CONSOLE_OUTPUT_ROWS = buildConf("spark.sqlstreaming.console.numRows") - .doc("The num of rows showed to console sink in sqlstreaming") - .stringConf - .createWithDefault("20") - val SQLSTREAM_QUERY_ENABLE = buildConf("spark.sqlstreaming.query.enable") .doc("Whether to enable use sqlstreaming in spark") .booleanConf @@ -1852,16 +1842,12 @@ class SQLConf extends Serializable with Logging { def sqlStreamWaterMarkEnable: Boolean = getConf(SQLSTREAM_WATERMARK_ENABLE) - def sqlStreamConsoleSinkEnable: Boolean = getConf(SQLSTREAM_CONSOLESINK_ENABLE) - def sqlStreamOutputMode: String = getConf(SQLSTREAM_OUTPUTMODE) def sqlStreamTrigger: String = getConf(SQLSTREAM_TRIGGER) def sqlStreamQueryName: Option[String] = getConf(SQLSTREAM_QUERY_NAME) - def sqlStreamConsoleOutputRows: String = getConf(SQLSTREAM_CONSOLE_OUTPUT_ROWS) - def sqlStreamQueryEnable: Boolean = getConf(SQLSTREAM_QUERY_ENABLE) def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 24fd9cd6d37a6..026b3d7bf08e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -26,7 +26,7 @@ import org.antlr.v4.runtime.tree.TerminalNode import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.{UnresolvedRelation, UnresolvedStreamRelation} +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser._ @@ -41,7 +41,7 @@ import org.apache.spark.sql.types.StructType * Concrete parser for Spark SQL statements. */ class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser { - val astBuilder = new SparkSqlAstBuilderExt(conf) + val astBuilder = new SparkSqlAstBuilder(conf) private val substitutor = new VariableSubstitution(conf) @@ -50,61 +50,6 @@ class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser { } } -/** - * Inheriting from SparkSqlAstBuilder. - * visitQuerySpecification and visitTableName are rewritten to - * handle syntax parsing of sqlstreaming. - */ -class SparkSqlAstBuilderExt(conf: SQLConf) extends SparkSqlAstBuilder(conf) { - - import ParserUtils._ - - /** - * Mark if current sql contain stream query. This field should be set before visit table name, - * so that table will be parse to UnResolvedStreamRelation in Stream Query case. - */ - private var isStreamQuery = false - - /** - * Create a logical plan using a query specification. - * Add WithStreamDefinition plan if isStreamQuery is true - * @param ctx the parse tree - */ - override def visitQuerySpecification(ctx: QuerySpecificationContext): LogicalPlan = { - isStreamQuery = isStreamQuery || (ctx.STREAM() != null) - - val result = super.visitQuerySpecification(ctx) - - // With Stream - withStreams(ctx, result) - } - - /** - * Create an aliased table reference. This is typically used in FROM clauses. - * Define UnresolvedStreamRelation if isStreamQuery is true - * @param ctx the parse tree - */ - override def visitTableName(ctx: TableNameContext): LogicalPlan = withOrigin(ctx) { - - val relaion = if (isStreamQuery) { - UnresolvedStreamRelation(visitTableIdentifier(ctx.tableIdentifier)) - } else { - UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier)) - } - - val table = mayApplyAliasPlan(ctx.tableAlias, relaion) - table.optionalMap(ctx.sample)(withSample) - } - - private def withStreams(ctx: QuerySpecificationContext, query: LogicalPlan): LogicalPlan = { - if (isStreamQuery) { - WithStreamDefinition(query) - } else { - query - } - } -} - /** * Builder that converts an ANTLR ParseTree into a LogicalPlan/Expression/TableIdentifier. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index e1faecedd20ed..b5fc5854c9754 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -820,6 +820,10 @@ object DDLUtils { table.provider.isDefined && table.provider.get.toLowerCase(Locale.ROOT) != HIVE_PROVIDER } + def isStreamingTable(table: CatalogTable): Boolean = { + table.isStreaming && table.provider.get.toLowerCase(Locale.ROOT) != HIVE_PROVIDER + } + /** * Throws a standard error for actions that require partitionProvider = hive. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index c6000442fae76..5258b5e7b4f36 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -32,15 +32,17 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, InsertIntoDir} +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.execution.streaming.{SQLStreamingSink, StreamingRelation} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} /** * Replaces generic operations with specific variants that are designed to work with Spark @@ -221,6 +223,10 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast * data source. */ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] { + private val sqlConf = sparkSession.sqlContext.conf + private val WATERMARK_COLUMN = "watermark.column" + private val WATERMARK_DEALY = "watermark.delay" + private def readDataSourceTable(table: CatalogTable): LogicalPlan = { val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table) val catalog = sparkSession.sessionState.catalog @@ -239,11 +245,42 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] options = table.storage.properties ++ pathOption, catalogTable = Some(table)) - LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table) + if (table.isStreaming && sqlConf.sqlStreamQueryEnable) { + val relation = + StreamingRelation(dataSource, table.provider.get, table.schema.toAttributes) + withWatermark(relation, table) + } else { + LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table) + } } }) } + /** + * Check watermark enable. If true, add watermark to relation. + * @param relation the basic streaming relation + * @param metadata table meta + * @return + */ + private def withWatermark(relation: LogicalPlan, metadata: CatalogTable): LogicalPlan = { + if (sqlConf.sqlStreamWaterMarkEnable) { + logInfo("Using watermark in sqlstreaming") + val options = metadata.storage.properties + val column = options.getOrElse(WATERMARK_COLUMN, + throw new IllegalArgumentException(s"$WATERMARK_COLUMN is empty")) + val delay = options.getOrElse(WATERMARK_DEALY, + throw new IllegalArgumentException(s"$WATERMARK_DEALY is empty")) + EventTimeWatermark( + UnresolvedAttribute(column), + CalendarInterval.fromString(s"interval $delay"), + relation + ) + } else { + logInfo("None watermark found in sqlstreaming") + relation + } + } + private def readHiveTable(table: CatalogTable): LogicalPlan = { HiveTableRelation( table, @@ -253,6 +290,10 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] } override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, child, _, _) + if DDLUtils.isStreamingTable(tableMeta) && sqlConf.sqlStreamQueryEnable => + SQLStreamingSink(sparkSession, tableMeta, child) + case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) => i.copy(table = readDataSourceTable(tableMeta)) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SQLStreamingSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SQLStreamingSink.scala similarity index 85% rename from sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SQLStreamingSink.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SQLStreamingSink.scala index 6909171b7925d..cd2398dc1de40 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SQLStreamingSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SQLStreamingSink.scala @@ -15,33 +15,32 @@ * limitations under the License. */ -package org.apache.spark.sql.hive.execution +package org.apache.spark.sql.execution.streaming import java.util.concurrent.TimeUnit import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.streaming.InternalOutputModes import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2.StreamingWriteSupportProvider import org.apache.spark.sql.streaming.Trigger import org.apache.spark.util.Utils /** * The basic RunnableCommand for SQLStreaming, using Command.run to start a streaming query. + * * @param sparkSession * @param extraOptions * @param partitionColumnNames * @param child */ case class SQLStreamingSink(sparkSession: SparkSession, - extraOptions: Map[String, String], - partitionColumnNames: Seq[String], - child: LogicalPlan) + table: CatalogTable, + child: LogicalPlan) extends RunnableCommand { private val sqlConf = sparkSession.sqlContext.conf @@ -77,27 +76,25 @@ case class SQLStreamingSink(sparkSession: SparkSession, // Builder pattern config options /////////////////////////////////////////////////////////////////////////////////////// val df = Dataset.ofRows(sparkSession, child) -// val df = Dataset.ofRows(sparkSession, child) - val source = extraOptions("source") val outputMode = InternalOutputModes(sqlConf.sqlStreamOutputMode) - val normalizedParCols = partitionColumnNames.map { + val normalizedParCols = table.partitionColumnNames.map { normalize(df, _, "Partition") } - val ds = DataSource.lookupDataSource(source, sparkSession.sessionState.conf) + val ds = DataSource.lookupDataSource(table.provider.get, sparkSession.sessionState.conf) val disabledSources = sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",") - var options = extraOptions + var options = table.storage.properties val sink = ds.newInstance() match { case w: StreamingWriteSupportProvider if !disabledSources.contains(w.getClass.getCanonicalName) => val sessionOptions = DataSourceV2Utils.extractSessionConfigs( w, df.sparkSession.sessionState.conf) - options = sessionOptions ++ extraOptions + options = sessionOptions ++ options w case _ => val ds = DataSource( df.sparkSession, - className = source, + className = table.provider.get, options = options, partitionColumns = normalizedParCols) ds.createSink(outputMode) @@ -107,7 +104,7 @@ case class SQLStreamingSink(sparkSession: SparkSession, sqlConf.sqlStreamQueryName, None, df, - extraOptions, + table.storage.properties, sink, outputMode, trigger = parseTrigger() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamingQuerySuite.scala deleted file mode 100644 index 2665ef362d278..0000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamingQuerySuite.scala +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming - -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.execution.SparkSqlParser -import org.apache.spark.sql.execution.SparkSqlParserSuite -/** - * Used to check stream query - */ -class StreamingQuerySuite extends SparkSqlParserSuite { - - import org.apache.spark.sql.catalyst.dsl.expressions._ - import org.apache.spark.sql.catalyst.dsl.plans._ - - private lazy val parser = new SparkSqlParser(newConf) - - // change table to UnresolvedStreamRelation - def streamTable(ref: String): LogicalPlan = UnresolvedStreamRelation(TableIdentifier(ref)) - - private def assertEqual(sqlCommand: String, plan: LogicalPlan): Unit = { - val normalized1 = normalizePlan(parser.parsePlan(sqlCommand)) - val normalized2 = normalizePlan(plan) - comparePlans(normalized1, normalized2) - } - - test("simple stream query") { - assertEqual("select stream * from b", - WithStreamDefinition(streamTable("b").select(star()))) - } - - test("simple stream join batch") { - // stream join batch - assertEqual("select stream * from b join s", - WithStreamDefinition( - streamTable("b").join(streamTable("s"), Inner, None).select(star()) - ) - ) - } - - test("simple subquery(stream) join batch") { - // subquery(stream) join batch - assertEqual("select stream * from (select * from b) as c join s", - WithStreamDefinition( - WithStreamDefinition(streamTable("b").select(star())).as("c") - .join(streamTable("s"), Inner, None).select(star()) - ) - ) - } - - test("simple stream join subquery(batch)") { - // stream join subquery(batch) - assertEqual("select stream * from s join (select * from b) as c", - WithStreamDefinition( - streamTable("s").join( - WithStreamDefinition(streamTable("b").select(star())).as("c"), - Inner, - None).select(star()) - ) - ) - } - - test("simple subquery(stream) join subquery(batch)") { - // subquery(stream) join subquery(batch) - assertEqual("select stream * from (select * from s) as t join (select * from b) as c", - WithStreamDefinition( - WithStreamDefinition(streamTable("s").select(star())).as("t") - .join(WithStreamDefinition(streamTable("b").select(star())).as("c"), Inner, None) - .select(star()) - ) - ) - } -} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index 86c55fccc8a79..3fe49eee35d66 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.SparkPlanner import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.hive.execution.{ResolveStreamRelation, ValidSQLStreaming} import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionResourceLoader, SessionState} /** @@ -69,14 +68,6 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session */ override protected def analyzer: Analyzer = new Analyzer(catalog, conf) { override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = - /** Used to resolve UnResolvedStreamRelation, which is used in SQLstreaming */ - new ResolveStreamRelation(catalog, conf, session) +: - /** - * Check whether sqlstreaming plan is valid and - * change InsertIntoTable to specific Stream Sink, - * or add Console Sink when user just select - */ - new ValidSQLStreaming(session, conf) +: new ResolveHiveSerdeTable(session) +: new FindDataSourceTable(session) +: new ResolveSQLOnFile(session) +: diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ResolveStreamRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ResolveStreamRelation.scala deleted file mode 100644 index c2f9f46592cfd..0000000000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ResolveStreamRelation.scala +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.execution - -import java.util.Locale - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog, UnresolvedCatalogRelation} -import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan} -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.streaming.StreamingRelation -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.StreamSourceProvider -import org.apache.spark.sql.types.StructType -import org.apache.spark.unsafe.types.CalendarInterval - -/** - * Used to resolve UnResolvedStreamRelaTion, which is used in sqlstreaming - * Change UnResolvedStreamRelation to StreamingRelation if table is stream_table, - * otherwise, change UnResolvedStreamRelation to HiveTableRelation or other source Relation - * @param catalog - * @param conf - * @param sparkSession - */ -class ResolveStreamRelation(catalog: SessionCatalog, - sqlConf: SQLConf, - sparkSession: SparkSession) - extends Rule[LogicalPlan] with CheckAnalysis { - - /** - * SQLStreaming watermark configuration. - * User must use spark.sqlstreaming.watermark.{db}.{table}.[column/delay] to set watermark. - */ - private val SQLSTREAM_WATERMARK = "spark.sqlstreaming.watermark" - private val COLUMN = "column" - private val DELAY = "delay" - - private def lookupRelation(relation: UnresolvedStreamRelation, - defaultDatabase: Option[String] = None): LogicalPlan = { - - val tableIdentWithDb = relation.tableIdentifier.copy( - database = relation.tableIdentifier.database.orElse(defaultDatabase)) - - try { - val dbName = tableIdentWithDb.database.getOrElse(catalog.getCurrentDatabase) - val db = formatDatabaseName(dbName) - val table = formatTableName(tableIdentWithDb.table) - val metadata = catalog.externalCatalog.getTable(db, table) - - if (metadata.isStreaming) { - lookupStreamingRelation(metadata) - } else { - catalog.lookupRelation(tableIdentWithDb) - } - } catch { - case _: NoSuchTableException => - relation.failAnalysis( - s"Stream Table or view not found: ${tableIdentWithDb.unquotedString}") - // If the database is defined and that database is not found, throw an AnalysisException. - // Note that if the database is not defined, it is possible we are looking up a temp view. - case e: NoSuchDatabaseException => - relation.failAnalysis(s"Stream Table or view not found: " + - s"${tableIdentWithDb.unquotedString}, the database ${e.db} doesn't exsits.") - } - } - - /** - * Format table name, taking into account case sensitivity. - */ - protected[this] def formatTableName(name: String): String = { - if (sqlConf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT) - } - - /** - * Format database name, taking into account case sensitivity. - */ - protected[this] def formatDatabaseName(name: String): String = { - if (sqlConf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT) - } - - /** - * Create StreamingRelation from table - * @param table - * @return - */ - private[hive] def lookupStreamingRelation(table: CatalogTable): LogicalPlan = { - table.provider match { - case Some(sourceName) => - DataSource.lookupDataSource(sourceName, sqlConf).newInstance() match { - case s: StreamSourceProvider => - createStreamingRelation(table, usingFileStreamSource = false) - case format: FileFormat => - createStreamingRelation(table, usingFileStreamSource = true) - case _ => - throw new Exception(s"Cannot find Streaming Relation for provider $sourceName") - } - case _ => - throw new Exception("Invalid provider for Streaming Relation") - } - } - - /** - * Create StreamingRelation from table for SourceProvider - * @param table - * @return - */ - private def createStreamingRelation( - table: CatalogTable, - usingFileStreamSource: Boolean): LogicalPlan = { - - /** - * Get Source or Sink meta data from table. - */ - var sourceProperties = table.storage.properties - val partitionColumnNames = table.partitionColumnNames - val sourceName = table.provider.get - sourceProperties += ("source" -> sourceName) - var userSpecifiedSchema: Option[StructType] = Some(table.schema) - - if (usingFileStreamSource) { - sourceProperties += ("path" -> table.location.getPath) - } - - val relation = StreamingRelation( - DataSource( - sparkSession, - sourceName, - userSpecifiedSchema = userSpecifiedSchema, - options = sourceProperties, - partitionColumns = partitionColumnNames - ), - sourceName, - table.schema.toAttributes - ) - - /** - * Check watermark - */ - withWaterMark(relation, table) - } - - /** - * Check watermark enable. If true, add watermark to relation. - * @param relation the basic streaming relation - * @param metadata table meta - * @return - */ - private def withWaterMark(relation: LogicalPlan, metadata: CatalogTable): LogicalPlan = { - if (sqlConf.sqlStreamWaterMarkEnable) { - - logInfo("Using watermark in sqlstreaming") - val tableName = s"${metadata.identifier.database.get}.${metadata.identifier.table}" - val SQLSTREAM_WATERMARK_COLUMN = s"$SQLSTREAM_WATERMARK.$tableName.$COLUMN" - val SQLSTREAM_WATERMARK_DELAY = s"$SQLSTREAM_WATERMARK.$tableName.$DELAY" - val column = sqlConf.getConfString(SQLSTREAM_WATERMARK_COLUMN) - val delay = sqlConf.getConfString(SQLSTREAM_WATERMARK_DELAY) - - EventTimeWatermark( - UnresolvedAttribute(column), - CalendarInterval.fromString(s"interval $delay"), - relation - ) - } else { - - logInfo("None watermark found in sqlstreaming") - relation - } - } - - def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { - case u: UnresolvedStreamRelation => - val defaultDatabase = AnalysisContext.get.defaultDatabase - lookupRelation(u, defaultDatabase) - } -} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ValidSQLStreaming.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ValidSQLStreaming.scala deleted file mode 100644 index 9e4693b5d2823..0000000000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ValidSQLStreaming.scala +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.execution - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.analysis.CheckAnalysis -import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.execution.streaming.StreamingRelation -import org.apache.spark.sql.internal.SQLConf - -/** - * Check whether sqlstreaming is valid, call failAnalysis if failed - * @param sparkSession - */ -class ValidSQLStreaming(sparkSession: SparkSession, sqlConf: SQLConf) - extends Rule[LogicalPlan] with CheckAnalysis { - - /** - * Check whether logicalPlan is valid, if valid then create SQLStreamingSink. - * If user use InsertIntoTable, then use table meta to create SQLStreamingSink, - * if not, then create ConsoleSink to print result to Console - * @param tableMeta the CatalogTable for stream source - * @param child the logicalPlan of dataframe - * @return - */ - private def createSQLStreamingSink(tableMeta: Option[CatalogTable], - child: LogicalPlan): LogicalPlan = { - - checkSQLStreamingEnable() - - tableMeta match { - case Some(table) => - // Used when user define the sink meta - if (!table.isStreaming) { - failAnalysis(s"Not supported to insert write into " + - s"none stream table ${table.qualifiedName}") - } - val resolveStreamRelation = new ResolveStreamRelation(sparkSession.sessionState.catalog, - sqlConf, sparkSession) - val streamingRelation = resolveStreamRelation.lookupStreamingRelation(table) - val extraOptions = streamingRelation.asInstanceOf[StreamingRelation].dataSource.options - val partitionColumnNames = table.partitionColumnNames - SQLStreamingSink(sparkSession, extraOptions, partitionColumnNames, child) - - case None if sqlConf.sqlStreamConsoleSinkEnable => - logInfo("Enable to use ConsoleSink") - // Used when user use just select stream * from table. - var extraOptions = Map("numRows" -> sqlConf.sqlStreamConsoleOutputRows) - extraOptions += ("source" -> "console") - val partitionColumnNames = Seq() - SQLStreamingSink(sparkSession, extraOptions, partitionColumnNames, child) - - // Sink_table is not defined, and user disable console sink - // The purpose of this part of the design is to keep synchronous with the design of - // offline Spark-sql, allowing users to submit streaming in "code + SQL", which is, - // users can generate df by querying stream_table and then customize the sink output. - // here is a little example - // val kafkaDF = - // spark.sql("select stream cast(key as string), cast(value as string) from kafka_sql_test") - // kafkaDF.writeStream - // .foreach(new ForeachWriter[Row] { - // override def process(value: Row) = { - // println(value.toSeq(1)) - // } - // override def close(errorOrNull: Throwable) = { - // } - // override def open(partitionId: Long, version: Long) = { - // true - // } - // }) - // .trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS)) - // .start() - // .awaitTermination() - case _ => - logWarning("No sink available in sqlstreaming, user should define StructStreaming Sink") - child - } - } - - /** - * Remove unused WithStreamDefinition when use alias query - * @param plan the logicalPlan of dataframe - * @return logicalPlan - */ - private def removeUnusedWithStreamDefinition(plan: LogicalPlan): LogicalPlan = - plan.resolveOperators { - case logical: WithStreamDefinition => - logical.child - } - - /** - * Default Disable SQLStreaming. User must set true to run SQLStreaming - */ - private def checkSQLStreamingEnable(): Unit = { - if (!sqlConf.sqlStreamQueryEnable) { - failAnalysis("Disable SQLStreaming for default. If you want to use SQLSteaming, " + - s"set ${SQLConf.SQLSTREAM_QUERY_ENABLE.key}=true") - } - } - - /** - * Insert overwrite directory are not supported now, - * thus this program doesn't check valid insert overwrite directory - * @param plan - * @return - */ - override def apply(plan: LogicalPlan): LogicalPlan = { - if (!plan.analyzed && plan.isStreaming) { - plan match { - - case InsertIntoTable(LogicalRelation(_, _, Some(tableMeta), _), _, child, _, _) - if child.resolved => - createSQLStreamingSink(Some(tableMeta), removeUnusedWithStreamDefinition(child)) - - case WithStreamDefinition(child) if child.resolved => - createSQLStreamingSink(None, removeUnusedWithStreamDefinition(child)) - - case _ => - plan - } - } else { - plan - } - } -} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ResolveStreamRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ResolveStreamRelationSuite.scala deleted file mode 100644 index 80233781147ce..0000000000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ResolveStreamRelationSuite.scala +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.execution - -import java.net.URI -import java.nio.file._ - -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.streaming.StreamingRelation -import org.apache.spark.sql.hive.test.TestHiveSingleton -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types._ - -/** - * Test whether UnresolvedStreamRelation can be transformed tp StreamRelation or LogicalRelation - */ -class ResolveStreamRelationSuite extends AnalysisTest with TestHiveSingleton { - - protected val sqlConf = hiveContext.conf - - private val tempPath: String = { - val temp = Paths.get("/tmp/somewhere") - if (!Files.exists(temp)) { - Files.createDirectory(temp) - } - temp.toString - } - - protected val csvOptions: Map[String, String] = Map( - "isStreaming" -> "true", - "source" -> "csv", - "path" -> tempPath, - "sep" -> "\t" - ) - - protected val parquetOptions: Map[String, String] = Map( - "isStreaming" -> "true", - "source" -> "parquet", - "path" -> tempPath - ) - - // the basic table schema for tests - protected val tableSchema: StructType = StructType(Seq( - StructField("key", BinaryType), - StructField("value", BinaryType), - StructField("topic", StringType), - StructField("partition", IntegerType), - StructField("offset", LongType), - StructField("timestamp", TimestampType), - StructField("timestampType", IntegerType) - )) - - /** - * Generate Catalog table - * @param tableName - * @param sourceName - * @param options - * @return - */ - protected def getCatalogTable(tableName: String, - sourceName: String, - options: Map[String, String]): CatalogTable = { - val storage = DataSource.buildStorageFormatFromOptions(options) - CatalogTable( - identifier = TableIdentifier(tableName, Some("default")), - tableType = CatalogTableType.MANAGED, - provider = Some(sourceName), - storage = storage, - schema = tableSchema, - partitionColumnNames = Nil) - } - - protected val prepareTables: Seq[CatalogTable] = Seq( - getCatalogTable("csvTable", "csv", csvOptions), - getCatalogTable("parquetTable", "parquet", parquetOptions) - ) - - // overwrite this method in order to change sqlConf in tests - override protected def getAnalyzer(caseSensitive: Boolean) = { - val conf = sqlConf.copy(SQLConf.CASE_SENSITIVE -> caseSensitive) - val catalog = new SessionCatalog(new InMemoryCatalog, FunctionRegistry.builtin, conf) - catalog.createDatabase( - CatalogDatabase("default", "", new URI("loc"), Map.empty), - ignoreIfExists = false) - prepareTables.foreach{ table => - catalog.createTable(table, ignoreIfExists = true) - } - new Analyzer(catalog, conf) { - override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = - new ResolveStreamRelation(catalog, conf, spark) :: - new ValidSQLStreaming(spark, conf) :: - new FindDataSourceTable(spark) :: Nil - } - } - - protected def getStreamRelation( - sourceName: String, - options: Map[String, String]): StreamingRelation = - StreamingRelation( - DataSource( - sparkSession = spark, - className = sourceName, - options = options, - userSpecifiedSchema = Some(tableSchema), - partitionColumns = Seq() - ) - ) - - test("resolve stream relations with manage table") { - assertAnalysisError(UnresolvedStreamRelation(TableIdentifier("tAbLe")), Seq()) - } - - test("resolve stream relations with csv stream tabl") { - checkAnalysis( - UnresolvedStreamRelation(TableIdentifier("csvTable")), - getStreamRelation("csv", csvOptions) - ) - } - - test("resolve stream relations with parquet stream table") { - checkAnalysis( - UnresolvedStreamRelation(TableIdentifier("parquetTable")), - getStreamRelation("parquet", parquetOptions) - ) - } - - test("resolve stream relations with watermark") { - val column: String = "timestamp" - val delay: String = "2 seconds" - sqlConf.setConf(SQLConf.SQLSTREAM_WATERMARK_ENABLE, true) - sqlConf.setConfString("spark.sqlstreaming.watermark.default.csvTable.column", column) - sqlConf.setConfString("spark.sqlstreaming.watermark.default.csvTable.delay", delay) - assertAnalysisSuccess( - UnresolvedStreamRelation(TableIdentifier("csvTable")) - ) - } -} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ValidSQLStreamingSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ValidSQLStreamingSuite.scala deleted file mode 100644 index 0e68aca5ebb7b..0000000000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ValidSQLStreamingSuite.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.hive.execution - -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.UnresolvedStreamRelation -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, WithStreamDefinition} -import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.internal.SQLConf - -class ValidSQLStreamingSuite extends ResolveStreamRelationSuite { - - import org.apache.spark.sql.catalyst.dsl.expressions._ - import org.apache.spark.sql.catalyst.dsl.plans._ - - private val consoleSinkOptions = Map( - "numRows" -> sqlConf.sqlStreamConsoleOutputRows, - "source" -> "console" - ) - - test("select stream * from csvTable with enable=false") { - sqlConf.setConf(SQLConf.SQLSTREAM_QUERY_ENABLE, false) - assertAnalysisError( - WithStreamDefinition(getStreamRelation("csv", csvOptions).select(star())), - "Disable SQLStreaming for default." :: Nil - ) - } - - test("select stream * from csvTable with enable=true") { - sqlConf.setConf(SQLConf.SQLSTREAM_QUERY_ENABLE, true) - sqlConf.setConf(SQLConf.SQLSTREAM_CONSOLESINK_ENABLE, true) - assertAnalysisSuccess( - WithStreamDefinition( - UnresolvedStreamRelation(TableIdentifier("csvTable")) - ) - ) - } - - test("insert into csvTable select stream * from parquetTable") { - sqlConf.setConf(SQLConf.SQLSTREAM_QUERY_ENABLE, true) - assertAnalysisSuccess( - InsertIntoTable( - LogicalRelation( - null, - Seq(), - Some(getCatalogTable("csvTable", "csv", csvOptions)), - isStreaming = false - ), - Map(), - WithStreamDefinition( - UnresolvedStreamRelation(TableIdentifier("parquetTable")) - ), - overwrite = false, - ifPartitionNotExists = false - ) - ) - } - - test("insert into csvTable select stream * from parquetTable with enable=true") { - sqlConf.setConf(SQLConf.SQLSTREAM_QUERY_ENABLE, true) - assertAnalysisError( - InsertIntoTable( - LogicalRelation( - null, - Seq(), - Some(getCatalogTable("csvTable", "csv", csvOptions.filterNot(_._1 == "isStreaming"))), - isStreaming = false - ), - Map(), - WithStreamDefinition( - UnresolvedStreamRelation(TableIdentifier("parquetTable")) - ), - overwrite = false, - ifPartitionNotExists = false - ), - "Not supported to insert write into none stream table" :: Nil - ) - } -} From 0392de7f65f8fb8093b5e2910c9616247ee0ba42 Mon Sep 17 00:00:00 2001 From: Jackey Lee Date: Fri, 26 Oct 2018 09:07:57 +0800 Subject: [PATCH 5/7] [SPARK-24630][SS] Support SQLStreaming in Spark: remove unused changes --- .../scala/org/apache/spark/sql/execution/SparkSqlParser.scala | 1 - .../org/apache/spark/sql/hive/HiveSessionStateBuilder.scala | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 026b3d7bf08e1..89cb63784c0f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -26,7 +26,6 @@ import org.antlr.v4.runtime.tree.TerminalNode import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser._ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index 3fe49eee35d66..2882672f327c4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -68,7 +68,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session */ override protected def analyzer: Analyzer = new Analyzer(catalog, conf) { override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = - new ResolveHiveSerdeTable(session) +: + new ResolveHiveSerdeTable(session) +: new FindDataSourceTable(session) +: new ResolveSQLOnFile(session) +: customResolutionRules From 61504637e022d36ae53ee8482ed47ab882ccad99 Mon Sep 17 00:00:00 2001 From: Jackey Lee Date: Thu, 6 Dec 2018 17:18:12 +0800 Subject: [PATCH 6/7] [SPARK-24630][SS] Support SQLStreaming in Spark: change microseconds to milliseconds --- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 10 +++++----- .../sql/execution/streaming/SQLStreamingSink.scala | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index c8b148be84536..8f86b472b9373 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1085,7 +1085,7 @@ private[spark] object Utils extends Logging { } /** - * Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If + * Convert a time parameter such as (50s, 100ms, or 250us) to milliseconds for internal use. If * no suffix is provided, the passed number is assumed to be in ms. */ def timeStringAsMs(str: String): Long = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 89d02567bc4f1..572370c70191c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -631,29 +631,29 @@ object SQLConf { .intConf .createWithDefault(200) - val SQLSTREAM_WATERMARK_ENABLE = buildConf("spark.sqlstreaming.watermark.enable") + val SQLSTREAM_WATERMARK_ENABLE = buildConf("spark.sql.streaming.watermark.enable") .doc("Whether use watermark in sqlstreaming.") .booleanConf .createWithDefault(false) - val SQLSTREAM_OUTPUTMODE = buildConf("spark.sqlstreaming.outputMode") + val SQLSTREAM_OUTPUTMODE = buildConf("spark.sql.streaming.outputMode") .doc("The output mode used in sqlstreaming") .stringConf .createWithDefault("append") - val SQLSTREAM_TRIGGER = buildConf("spark.sqlstreaming.trigger") + val SQLSTREAM_TRIGGER = buildConf("spark.sql.streaming.trigger") .doc("The structstreaming trigger used in sqlstreaming") .stringConf .createWithDefault("0s") - val SQLSTREAM_QUERY_NAME = buildConf("spark.sqlstreaming.queryName") + val SQLSTREAM_QUERY_NAME = buildConf("spark.sql.streaming.queryName") .doc("The structstreaming query name used in sqlstreaming. " + "User must use spark.sql.streaming.checkpointLocation and " + "spark.sqlstreaming.queryName to ensure the unique checkpointLocation") .stringConf .createOptional - val SQLSTREAM_QUERY_ENABLE = buildConf("spark.sqlstreaming.query.enable") + val SQLSTREAM_QUERY_ENABLE = buildConf("spark.sql.streaming.query.enable") .doc("Whether to enable use sqlstreaming in spark") .booleanConf .createWithDefault(false) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SQLStreamingSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SQLStreamingSink.scala index cd2398dc1de40..4cce27b53ff25 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SQLStreamingSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SQLStreamingSink.scala @@ -62,7 +62,7 @@ case class SQLStreamingSink(sparkSession: SparkSession, */ private def parseTrigger(): Trigger = { val trigger = Utils.timeStringAsMs(sqlConf.sqlStreamTrigger) - Trigger.ProcessingTime(trigger, TimeUnit.MICROSECONDS) + Trigger.ProcessingTime(trigger, TimeUnit.MILLISECONDS) } /** From e735938c8e3a8a19bf833b1fb62b44373142805c Mon Sep 17 00:00:00 2001 From: Jackey Lee Date: Thu, 6 Dec 2018 17:18:12 +0800 Subject: [PATCH 7/7] [SPARK-24630][SS] Support SQLStreaming in Spark: change microseconds to milliseconds --- .../spark/sql/execution/datasources/DataSourceStrategy.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index b9ee878913152..2a400642ee8fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -33,8 +33,8 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}