Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-39800][SQL][WIP] DataSourceV2: View Support #44197

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -2306,6 +2306,11 @@
"CREATE TEMPORARY FUNCTION with IF NOT EXISTS is not allowed."
]
},
"DESCRIBE_PARTITION_FOR_VIEW" : {
"message" : [
"DESCRIBE does not support partition for view."
]
},
"EMPTY_PARTITION_VALUE" : {
"message" : [
"Partition key <partKey> must set value."
Expand Down Expand Up @@ -2479,6 +2484,12 @@
],
"sqlState" : "42K09"
},
"INVALID_VIEW_CURRENT_CATALOG" : {
"message" : [
"Invalid current catalog <currentCatalog> in view <viewName>"
],
"sqlState" : "XX000"
},
"INVALID_VIEW_TEXT" : {
"message" : [
"The view <viewName> cannot be displayed due to invalid view text: <viewText>. This may be caused by an unauthorized modification of the view or an incorrect query syntax. Please check your query syntax and verify that the view has not been tampered with."
Expand Down Expand Up @@ -2801,6 +2812,12 @@
],
"sqlState" : "0A000"
},
"NOT_SUPPORTED_COMMAND_FOR_V2_VIEW" : {
"message" : [
"<cmd> is not supported for v2 views."
],
"sqlState" : "0A000"
},
"NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT" : {
"message" : [
"<cmd> is not supported, if you want to enable it, please set \"spark.sql.catalogImplementation\" to \"hive\"."
Expand Down Expand Up @@ -4420,11 +4437,6 @@
"<quoted> is not a temp view of streaming logical plan, please use batch API such as `DataFrameReader.table` to read it."
]
},
"_LEGACY_ERROR_TEMP_1011" : {
"message" : [
"Writing into a view is not allowed. View: <identifier>."
]
},
"_LEGACY_ERROR_TEMP_1012" : {
"message" : [
"Cannot write into v1 table: <identifier>."
Expand Down
4 changes: 4 additions & 0 deletions docs/sql-error-conditions-invalid-sql-syntax-error-class.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ CREATE TEMPORARY FUNCTION with specifying a database(`<database>`) is not allowe

CREATE TEMPORARY FUNCTION with IF NOT EXISTS is not allowed.

## DESCRIBE_PARTITION_FOR_VIEW

DESCRIBE does not support partition for view.

## EMPTY_PARTITION_VALUE

Partition key `<partKey>` must set value.
Expand Down
12 changes: 12 additions & 0 deletions docs/sql-error-conditions.md
Original file line number Diff line number Diff line change
Expand Up @@ -1369,6 +1369,12 @@ Invalid usage of `<elem>` in `<prettyName>`.

Variable type must be string type but got `<varType>`.

### INVALID_VIEW_CURRENT_CATALOG

[SQLSTATE: XX000](sql-error-conditions-sqlstates.html#class-XX-internal-error)

Invalid current catalog `<currentCatalog>` in view `<viewName>`

### INVALID_VIEW_TEXT

[SQLSTATE: XX000](sql-error-conditions-sqlstates.html#class-XX-internal-error)
Expand Down Expand Up @@ -1610,6 +1616,12 @@ ALTER TABLE ALTER/CHANGE COLUMN is not supported for changing `<table>`'s column

`<cmd>` is not supported for v2 tables.

### NOT_SUPPORTED_COMMAND_FOR_V2_VIEW

[SQLSTATE: 0A000](sql-error-conditions-sqlstates.html#class-0A-feature-not-supported)

`<cmd>` is not supported for v2 views.

### NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT

[SQLSTATE: 0A000](sql-error-conditions-sqlstates.html#class-0A-feature-not-supported)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
errorOnExceed = true,
maxIterationsSetting = SQLConf.ANALYZER_MAX_ITERATIONS.key)

/**
* Override to provide additional rules for the "Substitution" batch.
*/
val extendedSubstitutionRules: Seq[Rule[LogicalPlan]] = Nil

/**
* Override to provide additional rules for the "Resolution" batch.
*/
Expand All @@ -256,16 +261,17 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor

override def batches: Seq[Batch] = Seq(
Batch("Substitution", fixedPoint,
new SubstituteExecuteImmediate(catalogManager),
new SubstituteExecuteImmediate(catalogManager) +:
// This rule optimizes `UpdateFields` expression chains so looks more like optimization rule.
// However, when manipulating deeply nested schema, `UpdateFields` expression tree could be
// very complex and make analysis impossible. Thus we need to optimize `UpdateFields` early
// at the beginning of analysis.
OptimizeUpdateFields,
CTESubstitution,
WindowsSubstitution,
EliminateUnions,
SubstituteUnresolvedOrdinals),
OptimizeUpdateFields +:
CTESubstitution +:
WindowsSubstitution +:
EliminateUnions +:
SubstituteUnresolvedOrdinals +:
extendedSubstitutionRules : _*),
Batch("Disable Hints", Once,
new ResolveHints.DisableHints),
Batch("Hints", fixedPoint,
Expand Down Expand Up @@ -1054,7 +1060,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
// The view's child should be a logical plan parsed from the `desc.viewText`, the variable
// `viewText` should be defined, or else we throw an error on the generation of the View
// operator.
case view @ View(desc, isTempView, child) if !child.resolved =>
case view @ View(V1ViewDescription(desc), isTempView, child) if !child.resolved =>
// Resolve all the UnresolvedRelations and Views in the child.
val newChild = AnalysisContext.withAnalysisContext(desc) {
val nestedViewDepth = AnalysisContext.get.nestedViewDepth
Expand Down Expand Up @@ -1106,8 +1112,8 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
write.table match {
case u: UnresolvedRelation if !u.isStreaming =>
resolveRelation(u).map(unwrapRelationPlan).map {
case v: View => throw QueryCompilationErrors.writeIntoViewNotAllowedError(
v.desc.identifier, write)
case v: View => throw QueryCompilationErrors.expectTableNotViewError(
v.desc.identifier.nameParts, "V2WRITE", false, u)
case r: DataSourceV2Relation => write.withNewTable(r)
case u: UnresolvedCatalogRelation =>
throw QueryCompilationErrors.writeIntoV1TableNotAllowedError(
Expand Down Expand Up @@ -1140,7 +1146,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
}.getOrElse(u)

case u @ UnresolvedView(identifier, cmd, allowTemp, suggestAlternative) =>
lookupTableOrView(identifier, viewOnly = true).map {
lookupTableOrView(identifier).map {
case _: ResolvedTempView if !allowTemp =>
throw QueryCompilationErrors.expectPermanentViewNotTempViewError(
identifier, cmd, u)
Expand Down Expand Up @@ -1187,32 +1193,38 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
* Resolves relations to `ResolvedTable` or `Resolved[Temp/Persistent]View`. This is
* for resolving DDL and misc commands.
*/
private def lookupTableOrView(
identifier: Seq[String],
viewOnly: Boolean = false): Option[LogicalPlan] = {
private def lookupTableOrView(identifier: Seq[String]): Option[LogicalPlan] = {
lookupTempView(identifier).map { tempView =>
ResolvedTempView(identifier.asIdentifier, tempView.tableMeta)
}.orElse {
expandIdentifier(identifier) match {
case CatalogAndIdentifier(catalog, ident) =>
if (viewOnly && !CatalogV2Util.isSessionCatalog(catalog)) {
throw QueryCompilationErrors.catalogOperationNotSupported(catalog, "views")
}
CatalogV2Util.loadTable(catalog, ident).map {
case v1Table: V1Table if CatalogV2Util.isSessionCatalog(catalog) &&
v1Table.v1Table.tableType == CatalogTableType.VIEW =>
val v1Ident = v1Table.catalogTable.identifier
val v2Ident = Identifier.of(v1Ident.database.toArray, v1Ident.identifier)
ResolvedPersistentView(
catalog, v2Ident, v1Table.catalogTable)
case table =>
ResolvedTable.create(catalog.asTableCatalog, ident, table)
}
case _ => None
case SessionCatalogAndIdentifier(catalog, ident) =>
lookupTable(catalog, ident)
case NonSessionCatalogAndIdentifier(catalog, ident) =>
lookupView(catalog, ident)
.orElse(lookupTable(catalog, ident))
case _ =>
None
}
}
}

private def lookupView(catalog: CatalogPlugin, ident: Identifier): Option[LogicalPlan] =
CatalogV2Util.loadView(catalog, ident)
.map(V2ViewDescription(ident, _))
.map(ResolvedPersistentView(catalog.asViewCatalog, ident, _))

private def lookupTable(catalog: CatalogPlugin, ident: Identifier): Option[LogicalPlan] =
CatalogV2Util.loadTable(catalog, ident).map {
case v1Table: V1Table if CatalogV2Util.isSessionCatalog(catalog) &&
v1Table.v1Table.tableType == CatalogTableType.VIEW =>
val v1Ident = v1Table.catalogTable.identifier
val v2Ident = Identifier.of(v1Ident.database.toArray, v1Ident.identifier)
ResolvedPersistentView(catalog, v2Ident, V1ViewDescription(v1Table.catalogTable))
case table =>
ResolvedTable.create(catalog.asTableCatalog, ident, table)
}

private def createRelation(
catalog: CatalogPlugin,
ident: Identifier,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import scala.jdk.CollectionConverters._

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.connector.catalog.{Identifier, View, ViewCatalog}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.ArrayImplicits.SparkArrayOps

/**
* A trait for view description.
*/
trait ViewDescription {

val ident: Identifier

// For backwards compatibility, we need to keep the `identifier` as a `TableIdentifier`.
val identifier: TableIdentifier

val viewText: Option[String]

val viewCatalogAndNamespace: Seq[String]

val viewQueryColumnNames: Seq[String]

val viewSQLConfigs: Map[String, String]

val schema: StructType

val properties: Map[String, String]

def query: String = viewText.getOrElse("")

def comment: Option[String] = properties.get(ViewCatalog.PROP_COMMENT)

def owner: Option[String] = properties.get(ViewCatalog.PROP_OWNER)

def createEngineVersion: Option[String] = properties.get(ViewCatalog.PROP_CREATE_ENGINE_VERSION)
}

/**
* View description backed by a [[CatalogTable]].
*
* @param metadata a CatalogTable
*/
case class V1ViewDescription(metadata: CatalogTable) extends ViewDescription {

override val ident: Identifier = metadata.identifier.nameParts.asIdentifier

override val identifier: TableIdentifier = metadata.identifier

override val viewText: Option[String] = metadata.viewText

override val viewCatalogAndNamespace: Seq[String] = metadata.viewCatalogAndNamespace

override val viewQueryColumnNames: Seq[String] = metadata.viewQueryColumnNames

override val viewSQLConfigs: Map[String, String] = metadata.viewSQLConfigs

override val schema: StructType = metadata.schema

override val properties: Map[String, String] = metadata.properties
}

/**
* View description backed by a V2 [[View]].
*
* @param view a view in V2 catalog
*/
case class V2ViewDescription(
override val ident: Identifier,
view: View) extends ViewDescription {

override val identifier: TableIdentifier = ident.asTableIdentifier

override val viewText: Option[String] = Option(view.query)

override val viewCatalogAndNamespace: Seq[String] =
view.currentCatalog +: view.currentNamespace.toSeq

override val viewQueryColumnNames: Seq[String] = view.schema.fieldNames.toImmutableArraySeq

override val viewSQLConfigs: Map[String, String] = Map.empty

override val schema: StructType = view.schema

override val properties: Map[String, String] =
view.properties.asScala.toMap -- ViewCatalog.RESERVED_PROPERTIES.asScala
}
Loading