Skip to content

Commit

Permalink
[SPARK-45807][SQL] Improve ViewCatalog API
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Follow-up API improvements based on comments from apache#43677

### Why are the changes needed?

Required for DataSourceV2 view support.

### Does this PR introduce _any_ user-facing change?

No
### How was this patch tested?

N/A

### Was this patch authored or co-authored using generative AI tooling?

N/A

Closes apache#44330 from nastra/SPARK-45807-api-improvements.

Authored-by: Eduard Tudenhoefner <etudenhoefner@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
nastra authored and cloud-fan committed Dec 15, 2023
1 parent 3d447c2 commit 69de04f
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@

import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.apache.spark.annotation.DeveloperApi;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchViewException;
import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException;
import org.apache.spark.sql.types.StructType;

/**
* Catalog methods for working with views.
Expand Down Expand Up @@ -116,113 +114,38 @@ default boolean viewExists(Identifier ident) {
/**
* Create a view in the catalog.
*
* @param ident a view identifier
* @param sql the SQL text that defines the view
* @param currentCatalog the current catalog
* @param currentNamespace the current namespace
* @param schema the view query output schema
* @param queryColumnNames the query column names
* @param columnAliases the column aliases
* @param columnComments the column comments
* @param properties the view properties
* @param viewInfo the info class holding all view information
* @return the view created
* @throws ViewAlreadyExistsException If a view or table already exists for the identifier
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
*/
View createView(
Identifier ident,
String sql,
String currentCatalog,
String[] currentNamespace,
StructType schema,
String[] queryColumnNames,
String[] columnAliases,
String[] columnComments,
Map<String, String> properties) throws ViewAlreadyExistsException, NoSuchNamespaceException;
View createView(ViewInfo viewInfo) throws ViewAlreadyExistsException, NoSuchNamespaceException;

/**
* Replace a view in the catalog.
* <p>
* The default implementation has a race condition.
* Catalogs are encouraged to implement this operation atomically.
*
* @param ident a view identifier
* @param sql the SQL text that defines the view
* @param currentCatalog the current catalog
* @param currentNamespace the current namespace
* @param schema the view query output schema
* @param queryColumnNames the query column names
* @param columnAliases the column aliases
* @param columnComments the column comments
* @param properties the view properties
* @param viewInfo the info class holding all view information
* @param orCreate create the view if it doesn't exist
* @throws NoSuchViewException If the view doesn't exist or is a table
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
*/
default void replaceView(
Identifier ident,
String sql,
String currentCatalog,
String[] currentNamespace,
StructType schema,
String[] queryColumnNames,
String[] columnAliases,
String[] columnComments,
Map<String, String> properties) throws NoSuchViewException, NoSuchNamespaceException {
if (viewExists(ident)) {
dropView(ident);
try {
createView(ident, sql, currentCatalog, currentNamespace, schema,
queryColumnNames, columnAliases, columnComments, properties);
}
catch (ViewAlreadyExistsException e) {
throw new RuntimeException("Race condition when dropping and creating view", e);
}
} else {
throw new NoSuchViewException(ident);
ViewInfo viewInfo,
boolean orCreate)
throws NoSuchViewException, NoSuchNamespaceException {
if (viewExists(viewInfo.ident())) {
dropView(viewInfo.ident());
} else if (!orCreate) {
throw new NoSuchViewException(viewInfo.ident());
}
}

/**
* Create or replace a view in the catalog.
* <p>
* The default implementation has race conditions.
* Catalogs are encouraged to implement this operation atomically.
*
* @param ident a view identifier
* @param sql the SQL text that defines the view
* @param currentCatalog the current catalog
* @param currentNamespace the current namespace
* @param schema the view query output schema
* @param queryColumnNames the query column names
* @param columnAliases the column aliases
* @param columnComments the column comments
* @param properties the view properties
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
*/
default void createOrReplaceView(
Identifier ident,
String sql,
String currentCatalog,
String[] currentNamespace,
StructType schema,
String[] queryColumnNames,
String[] columnAliases,
String[] columnComments,
Map<String, String> properties) throws NoSuchNamespaceException {
if (viewExists(ident)) {
try {
replaceView(ident, sql, currentCatalog, currentNamespace, schema,
queryColumnNames, columnAliases, columnComments, properties);
} catch (NoSuchViewException e) {
throw new RuntimeException("Race condition when checking and replacing view", e);
}
} else {
try {
createView(ident, sql, currentCatalog, currentNamespace, schema,
queryColumnNames, columnAliases, columnComments, properties);
} catch (ViewAlreadyExistsException e) {
throw new RuntimeException("Race condition when checking and creating view", e);
}
try {
createView(viewInfo);
} catch (ViewAlreadyExistsException e) {
throw new RuntimeException("Race condition when creating/replacing view", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog;

import org.apache.spark.annotation.DeveloperApi;
import org.apache.spark.sql.types.StructType;

import javax.annotation.Nonnull;

import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.StringJoiner;

/**
* A class that holds view information.
*/
@DeveloperApi
public class ViewInfo {
private final Identifier ident;
private final String sql;
private final String currentCatalog;
private final String[] currentNamespace;
private final StructType schema;
private final String[] queryColumnNames;
private final String[] columnAliases;
private final String[] columnComments;
private final Map<String, String> properties;

public ViewInfo(
Identifier ident,
String sql,
String currentCatalog,
String[] currentNamespace,
StructType schema,
String[] queryColumnNames,
String[] columnAliases,
String[] columnComments,
Map<String, String> properties) {
this.ident = ident;
this.sql = sql;
this.currentCatalog = currentCatalog;
this.currentNamespace = currentNamespace;
this.schema = schema;
this.queryColumnNames = queryColumnNames;
this.columnAliases = columnAliases;
this.columnComments = columnComments;
this.properties = properties;
}

/**
* @return The view identifier
*/
@Nonnull
public Identifier ident() {
return ident;
}

/**
* @return The SQL text that defines the view
*/
@Nonnull
public String sql() {
return sql;
}

/**
* @return The current catalog
*/
@Nonnull
public String currentCatalog() {
return currentCatalog;
}

/**
* @return The current namespace
*/
@Nonnull
public String[] currentNamespace() {
return currentNamespace;
}

/**
* @return The view query output schema
*/
@Nonnull
public StructType schema() {
return schema;
}

/**
* @return The query column names
*/
@Nonnull
public String[] queryColumnNames() {
return queryColumnNames;
}

/**
* @return The column aliases
*/
@Nonnull
public String[] columnAliases() {
return columnAliases;
}

/**
* @return The column comments
*/
@Nonnull
public String[] columnComments() {
return columnComments;
}

/**
* @return The view properties
*/
@Nonnull
public Map<String, String> properties() {
return properties;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ViewInfo viewInfo = (ViewInfo) o;
return ident.equals(viewInfo.ident) && sql.equals(viewInfo.sql) &&
currentCatalog.equals(viewInfo.currentCatalog) &&
Arrays.equals(currentNamespace, viewInfo.currentNamespace) &&
schema.equals(viewInfo.schema) &&
Arrays.equals(queryColumnNames, viewInfo.queryColumnNames) &&
Arrays.equals(columnAliases, viewInfo.columnAliases) &&
Arrays.equals(columnComments, viewInfo.columnComments) &&
properties.equals(viewInfo.properties);
}

@Override
public int hashCode() {
int result = Objects.hash(ident, sql, currentCatalog, schema, properties);
result = 31 * result + Arrays.hashCode(currentNamespace);
result = 31 * result + Arrays.hashCode(queryColumnNames);
result = 31 * result + Arrays.hashCode(columnAliases);
result = 31 * result + Arrays.hashCode(columnComments);
return result;
}

@Override
public String toString() {
return new StringJoiner(", ", ViewInfo.class.getSimpleName() + "[", "]")
.add("ident=" + ident)
.add("sql='" + sql + "'")
.add("currentCatalog='" + currentCatalog + "'")
.add("currentNamespace=" + Arrays.toString(currentNamespace))
.add("schema=" + schema)
.add("queryColumnNames=" + Arrays.toString(queryColumnNames))
.add("columnAliases=" + Arrays.toString(columnAliases))
.add("columnComments=" + Arrays.toString(columnComments))
.add("properties=" + properties)
.toString();
}
}

0 comments on commit 69de04f

Please sign in to comment.