diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 817abebd72ac0..6364f2377e7ee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -259,6 +259,11 @@ case class CatalogTable( StructType(partitionFields) } + /** Return true if the table is stream table */ + def isStreaming: Boolean = { + provider.isDefined && storage.properties.getOrElse("isStreaming", "false").toBoolean + } + /** * schema of this table's data columns */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 86e068bf632bd..4d566f0c1c4f2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -637,6 +637,33 @@ object SQLConf { .intConf .createWithDefault(200) + val SQLSTREAM_WATERMARK_ENABLE = buildConf("spark.sql.streaming.watermark.enable") + .doc("Whether use watermark in sqlstreaming.") + .booleanConf + .createWithDefault(false) + + val SQLSTREAM_OUTPUTMODE = buildConf("spark.sql.streaming.outputMode") + .doc("The output mode used in sqlstreaming") + .stringConf + .createWithDefault("append") + + val SQLSTREAM_TRIGGER = buildConf("spark.sql.streaming.trigger") + .doc("The structstreaming trigger used in sqlstreaming") + .stringConf + .createWithDefault("0s") + + val SQLSTREAM_QUERY_NAME = buildConf("spark.sql.streaming.queryName") + .doc("The structstreaming query name used in sqlstreaming. " + + "User must use spark.sql.streaming.checkpointLocation and " + + "spark.sqlstreaming.queryName to ensure the unique checkpointLocation") + .stringConf + .createOptional + + val SQLSTREAM_QUERY_ENABLE = buildConf("spark.sql.streaming.query.enable") + .doc("Whether to enable use sqlstreaming in spark") + .booleanConf + .createWithDefault(false) + // This is used to set the default data source val DEFAULT_DATA_SOURCE_NAME = buildConf("spark.sql.sources.default") .doc("The default data source to use in input/output.") @@ -1884,6 +1911,16 @@ class SQLConf extends Serializable with Logging { def broadcastTimeout: Long = getConf(BROADCAST_TIMEOUT) + def sqlStreamWaterMarkEnable: Boolean = getConf(SQLSTREAM_WATERMARK_ENABLE) + + def sqlStreamOutputMode: String = getConf(SQLSTREAM_OUTPUTMODE) + + def sqlStreamTrigger: String = getConf(SQLSTREAM_TRIGGER) + + def sqlStreamQueryName: Option[String] = getConf(SQLSTREAM_QUERY_NAME) + + def sqlStreamQueryEnable: Boolean = getConf(SQLSTREAM_QUERY_ENABLE) + def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME) def convertCTAS: Boolean = getConf(CONVERT_CTAS) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 096481f68275d..412db1ce20bbe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -820,6 +820,10 @@ object DDLUtils { table.provider.isDefined && table.provider.get.toLowerCase(Locale.ROOT) != HIVE_PROVIDER } + def isStreamingTable(table: CatalogTable): Boolean = { + table.isStreaming && table.provider.get.toLowerCase(Locale.ROOT) != HIVE_PROVIDER + } + def readHiveTable(table: CatalogTable): HiveTableRelation = { HiveTableRelation( table, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index b5cf8c9515bfb..c711e6477f240 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -34,13 +34,16 @@ import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark +import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.execution.streaming.{SQLStreamingSink, StreamingRelation} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} /** * Replaces generic operations with specific variants that are designed to work with Spark @@ -221,6 +224,10 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast * data source. */ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] { + private val sqlConf = sparkSession.sqlContext.conf + private val WATERMARK_COLUMN = "watermark.column" + private val WATERMARK_DEALY = "watermark.delay" + private def readDataSourceTable(table: CatalogTable): LogicalPlan = { val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table) val catalog = sparkSession.sessionState.catalog @@ -239,12 +246,55 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] options = table.storage.properties ++ pathOption, catalogTable = Some(table)) - LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table) + if (table.isStreaming && sqlConf.sqlStreamQueryEnable) { + val relation = + StreamingRelation(dataSource, table.provider.get, table.schema.toAttributes) + withWatermark(relation, table) + } else { + LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table) + } } }) } + /** + * Check watermark enable. If true, add watermark to relation. + * @param relation the basic streaming relation + * @param metadata table meta + * @return + */ + private def withWatermark(relation: LogicalPlan, metadata: CatalogTable): LogicalPlan = { + if (sqlConf.sqlStreamWaterMarkEnable) { + logInfo("Using watermark in sqlstreaming") + val options = metadata.storage.properties + val column = options.getOrElse(WATERMARK_COLUMN, + throw new IllegalArgumentException(s"$WATERMARK_COLUMN is empty")) + val delay = options.getOrElse(WATERMARK_DEALY, + throw new IllegalArgumentException(s"$WATERMARK_DEALY is empty")) + EventTimeWatermark( + UnresolvedAttribute(column), + CalendarInterval.fromString(s"interval $delay"), + relation + ) + } else { + logInfo("None watermark found in sqlstreaming") + relation + } + } + + private def readHiveTable(table: CatalogTable): LogicalPlan = { + HiveTableRelation( + table, + // Hive table columns are always nullable. + table.dataSchema.asNullable.toAttributes, + table.partitionSchema.asNullable.toAttributes) + } + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, child, _, _) + if DDLUtils.isStreamingTable(tableMeta) && sqlConf.sqlStreamQueryEnable => + SQLStreamingSink(sparkSession, tableMeta, child) + case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) => i.copy(table = readDataSourceTable(tableMeta)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SQLStreamingSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SQLStreamingSink.scala new file mode 100644 index 0000000000000..4cce27b53ff25 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SQLStreamingSink.scala @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes +import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils +import org.apache.spark.sql.sources.v2.StreamingWriteSupportProvider +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.util.Utils + +/** + * The basic RunnableCommand for SQLStreaming, using Command.run to start a streaming query. + * + * @param sparkSession + * @param extraOptions + * @param partitionColumnNames + * @param child + */ +case class SQLStreamingSink(sparkSession: SparkSession, + table: CatalogTable, + child: LogicalPlan) + extends RunnableCommand { + + private val sqlConf = sparkSession.sqlContext.conf + + /** + * The given column name may not be equal to any of the existing column names if we were in + * case-insensitive context. Normalize the given column name to the real one so that we don't + * need to care about case sensitivity afterwards. + */ + private def normalize(df: DataFrame, columnName: String, columnType: String): String = { + val validColumnNames = df.logicalPlan.output.map(_.name) + validColumnNames.find(sparkSession.sessionState.analyzer.resolver(_, columnName)) + .getOrElse(throw new AnalysisException(s"$columnType column $columnName not found in " + + s"existing columns (${validColumnNames.mkString(", ")})")) + } + + /** + * Parse spark.sqlstreaming.trigger.seconds to Trigger + */ + private def parseTrigger(): Trigger = { + val trigger = Utils.timeStringAsMs(sqlConf.sqlStreamTrigger) + Trigger.ProcessingTime(trigger, TimeUnit.MILLISECONDS) + } + + /** + * Running by queryExecution.executeCollect() + * @param sparkSession + * @return return empty rdds, save as DDLCommands + */ + override def run(sparkSession: SparkSession): Seq[Row] = { + + /////////////////////////////////////////////////////////////////////////////////////// + // Builder pattern config options + /////////////////////////////////////////////////////////////////////////////////////// + val df = Dataset.ofRows(sparkSession, child) + val outputMode = InternalOutputModes(sqlConf.sqlStreamOutputMode) + val normalizedParCols = table.partitionColumnNames.map { + normalize(df, _, "Partition") + } + + val ds = DataSource.lookupDataSource(table.provider.get, sparkSession.sessionState.conf) + val disabledSources = sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",") + var options = table.storage.properties + val sink = ds.newInstance() match { + case w: StreamingWriteSupportProvider + if !disabledSources.contains(w.getClass.getCanonicalName) => + val sessionOptions = DataSourceV2Utils.extractSessionConfigs( + w, df.sparkSession.sessionState.conf) + options = sessionOptions ++ options + w + case _ => + val ds = DataSource( + df.sparkSession, + className = table.provider.get, + options = options, + partitionColumns = normalizedParCols) + ds.createSink(outputMode) + } + + sparkSession.sessionState.streamingQueryManager.startQuery( + sqlConf.sqlStreamQueryName, + None, + df, + table.storage.properties, + sink, + outputMode, + trigger = parseTrigger() + ).awaitTermination() + + Seq.empty[Row] + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index c1178ad4a84fb..540b28b4e34cc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -297,6 +297,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // it to Hive. If it fails, treat it as not hive compatible and go back to 2.1 val tableProperties = tableMetaToTableProps(table) + if (table.isStreaming) { + tableProperties.put(DATASOURCE_STREAM_TABLE, "true") + } + // put table provider and partition provider in table properties. tableProperties.put(DATASOURCE_PROVIDER, provider) if (table.tracksPartitionsInCatalog) { @@ -1332,6 +1336,7 @@ object HiveExternalCatalog { val HIVE_GENERATED_TABLE_PROPERTIES = Set(DDL_TIME) val HIVE_GENERATED_STORAGE_PROPERTIES = Set(SERIALIZATION_FORMAT) + val DATASOURCE_STREAM_TABLE = DATASOURCE_PREFIX + "isStreaming" // When storing data source tables in hive metastore, we need to set data schema to empty if the // schema is hive-incompatible. However we need a hack to preserve existing behavior. Before diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StreamTableDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StreamTableDDLCommandSuite.scala new file mode 100644 index 0000000000000..39299d2154548 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StreamTableDDLCommandSuite.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils + +class StreamTableDDLCommandSuite extends SQLTestUtils with TestHiveSingleton { + private val catalog = spark.sessionState.catalog + + test("CTAS: create data source stream table") { + withTempPath { dir => + withTable("t") { + sql( + s"""CREATE TABLE t USING PARQUET + |OPTIONS ( + |PATH = '${dir.toURI}', + |location = '${dir.toURI}', + |isStreaming = 'true') + |AS SELECT 1 AS a, 2 AS b, 3 AS c + """.stripMargin) + val streamTable = catalog.getTableMetadata(TableIdentifier("t")) + assert(streamTable.isStreaming) + } + } + } +}