Skip to content

Commit

Permalink
Core: Add View support for REST catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Sep 13, 2023
1 parent 82af1f9 commit 59bde7f
Show file tree
Hide file tree
Showing 24 changed files with 2,473 additions and 22 deletions.
119 changes: 119 additions & 0 deletions api/src/main/java/org/apache/iceberg/catalog/ViewSessionCatalog.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.catalog;

import java.util.List;
import java.util.Map;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchViewException;
import org.apache.iceberg.view.View;
import org.apache.iceberg.view.ViewBuilder;

/** A session Catalog API for view create, drop, and load operations. */
public interface ViewSessionCatalog {

/**
* Return the name for this catalog.
*
* @return this catalog's name
*/
String name();

/**
* Return all the identifiers under this namespace.
*
* @param namespace a namespace
* @return a list of identifiers for views
* @throws NoSuchNamespaceException if the namespace is not found
*/
List<TableIdentifier> listViews(SessionCatalog.SessionContext context, Namespace namespace);

/**
* Load a view.
*
* @param identifier a view identifier
* @return instance of {@link View} implementation referred by the identifier
* @throws NoSuchViewException if the view does not exist
*/
View loadView(SessionCatalog.SessionContext context, TableIdentifier identifier);

/**
* Check whether view exists.
*
* @param identifier a view identifier
* @return true if the view exists, false otherwise
*/
default boolean viewExists(SessionCatalog.SessionContext context, TableIdentifier identifier) {
try {
loadView(context, identifier);
return true;
} catch (NoSuchViewException e) {
return false;
}
}

/**
* Instantiate a builder to create or replace a SQL view.
*
* @param identifier a view identifier
* @return a view builder
*/
ViewBuilder buildView(SessionCatalog.SessionContext context, TableIdentifier identifier);

/**
* Drop a view.
*
* @param identifier a view identifier
* @return true if the view was dropped, false if the view did not exist
*/
boolean dropView(SessionCatalog.SessionContext context, TableIdentifier identifier);

/**
* Rename a view.
*
* @param from identifier of the view to rename
* @param to new view identifier
* @throws NoSuchViewException if the "from" view does not exist
* @throws AlreadyExistsException if the "to" view already exists
*/
void renameView(SessionCatalog.SessionContext context, TableIdentifier from, TableIdentifier to);

/**
* Invalidate cached view metadata from current catalog.
*
* <p>If the view is already loaded or cached, drop cached data. If the view does not exist or is
* not cached, do nothing.
*
* @param identifier a view identifier
*/
default void invalidateView(SessionCatalog.SessionContext context, TableIdentifier identifier) {}

/**
* Initialize a view catalog given a custom name and a map of catalog properties.
*
* <p>A custom view catalog implementation must have a no-arg constructor. A compute engine like
* Spark or Flink will first initialize the catalog without any arguments, and then call this
* method to complete catalog initialization with properties passed into the engine.
*
* @param name a custom name for the catalog
* @param properties catalog properties
*/
default void initialize(String name, Map<String, String> properties) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.view.View;
import org.apache.iceberg.view.ViewBuilder;

public abstract class BaseSessionCatalog implements SessionCatalog {
public abstract class BaseSessionCatalog implements SessionCatalog, ViewSessionCatalog {
private final Cache<String, Catalog> catalogs =
Caffeine.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build();

Expand Down Expand Up @@ -62,7 +64,7 @@ public <T> T withContext(SessionContext context, Function<Catalog, T> task) {
return task.apply(asCatalog(context));
}

public class AsCatalog implements Catalog, SupportsNamespaces {
public class AsCatalog implements Catalog, SupportsNamespaces, ViewCatalog {
private final SessionContext context;

private AsCatalog(SessionContext context) {
Expand All @@ -84,6 +86,11 @@ public TableBuilder buildTable(TableIdentifier ident, Schema schema) {
return BaseSessionCatalog.this.buildTable(context, ident, schema);
}

@Override
public void initialize(String catalogName, Map<String, String> props) {
BaseSessionCatalog.this.initialize(catalogName, props);
}

@Override
public Table registerTable(TableIdentifier ident, String metadataFileLocation) {
return BaseSessionCatalog.this.registerTable(context, ident, metadataFileLocation);
Expand Down Expand Up @@ -159,5 +166,40 @@ public boolean removeProperties(Namespace namespace, Set<String> removals) {
public boolean namespaceExists(Namespace namespace) {
return BaseSessionCatalog.this.namespaceExists(context, namespace);
}

@Override
public List<TableIdentifier> listViews(Namespace namespace) {
return BaseSessionCatalog.this.listViews(context, namespace);
}

@Override
public View loadView(TableIdentifier identifier) {
return BaseSessionCatalog.this.loadView(context, identifier);
}

@Override
public boolean viewExists(TableIdentifier identifier) {
return BaseSessionCatalog.this.viewExists(context, identifier);
}

@Override
public ViewBuilder buildView(TableIdentifier identifier) {
return BaseSessionCatalog.this.buildView(context, identifier);
}

@Override
public boolean dropView(TableIdentifier identifier) {
return BaseSessionCatalog.this.dropView(context, identifier);
}

@Override
public void renameView(TableIdentifier from, TableIdentifier to) {
BaseSessionCatalog.this.renameView(context, from, to);
}

@Override
public void invalidateView(TableIdentifier identifier) {
BaseSessionCatalog.this.invalidateView(context, identifier);
}
}
}
114 changes: 114 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,26 +45,37 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.ViewCatalog;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NoSuchViewException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.requests.CreateViewRequest;
import org.apache.iceberg.rest.requests.RegisterTableRequest;
import org.apache.iceberg.rest.requests.RenameTableRequest;
import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest;
import org.apache.iceberg.rest.requests.UpdateTableRequest;
import org.apache.iceberg.rest.responses.CreateNamespaceResponse;
import org.apache.iceberg.rest.responses.GetNamespaceResponse;
import org.apache.iceberg.rest.responses.ImmutableLoadViewResponse;
import org.apache.iceberg.rest.responses.ListNamespacesResponse;
import org.apache.iceberg.rest.responses.ListTablesResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.apache.iceberg.rest.responses.LoadViewResponse;
import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.view.BaseView;
import org.apache.iceberg.view.SQLViewRepresentation;
import org.apache.iceberg.view.View;
import org.apache.iceberg.view.ViewBuilder;
import org.apache.iceberg.view.ViewMetadata;
import org.apache.iceberg.view.ViewOperations;

public class CatalogHandlers {
private static final Schema EMPTY_SCHEMA = new Schema();
Expand Down Expand Up @@ -374,4 +385,107 @@ static TableMetadata commit(TableOperations ops, UpdateTableRequest request) {

return ops.current();
}

public static BaseView baseView(View view) {
Preconditions.checkArgument(view instanceof BaseView, "View must be a BaseView");
return (BaseView) view;
}

public static ListTablesResponse listViews(ViewCatalog catalog, Namespace namespace) {
return ListTablesResponse.builder().addAll(catalog.listViews(namespace)).build();
}

public static LoadViewResponse createView(
ViewCatalog catalog, Namespace namespace, CreateViewRequest request) {
request.validate();

ViewMetadata viewMetadata = request.metadata();
ViewBuilder viewBuilder =
catalog
.buildView(TableIdentifier.of(namespace, request.name()))
.withSchema(viewMetadata.schema())
.withProperties(viewMetadata.properties())
.withDefaultNamespace(viewMetadata.currentVersion().defaultNamespace())
.withDefaultCatalog(viewMetadata.currentVersion().defaultCatalog());
viewMetadata.currentVersion().representations().stream()
.filter(r -> r instanceof SQLViewRepresentation)
.map(r -> (SQLViewRepresentation) r)
.forEach(r -> viewBuilder.withQuery(r.dialect(), r.sql()));
View view = viewBuilder.create();

return viewResponse(view);
}

private static LoadViewResponse viewResponse(View view) {
ViewMetadata metadata = baseView(view).operations().current();
return ImmutableLoadViewResponse.builder()
.metadata(metadata)
.metadataLocation(metadata.metadataFileLocation())
.build();
}

public static LoadViewResponse loadView(ViewCatalog catalog, TableIdentifier viewIdentifier) {
View view = catalog.loadView(viewIdentifier);
return viewResponse(view);
}

public static LoadViewResponse updateView(
ViewCatalog catalog, TableIdentifier ident, UpdateTableRequest request) {
View view = catalog.loadView(ident);
ViewMetadata metadata = commit(baseView(view).operations(), request);

return ImmutableLoadViewResponse.builder()
.metadata(metadata)
.metadataLocation(metadata.metadataFileLocation())
.build();
}

public static void renameView(ViewCatalog catalog, RenameTableRequest request) {
catalog.renameView(request.source(), request.destination());
}

public static void dropView(ViewCatalog catalog, TableIdentifier viewIdentifier) {
boolean dropped = catalog.dropView(viewIdentifier);
if (!dropped) {
throw new NoSuchViewException("View does not exist: %s", viewIdentifier);
}
}

static ViewMetadata commit(ViewOperations ops, UpdateTableRequest request) {
AtomicBoolean isRetry = new AtomicBoolean(false);
try {
Tasks.foreach(ops)
.retry(COMMIT_NUM_RETRIES_DEFAULT)
.exponentialBackoff(
COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
2.0 /* exponential */)
.onlyRetryOn(CommitFailedException.class)
.run(
taskOps -> {
ViewMetadata base = isRetry.get() ? taskOps.refresh() : taskOps.current();
isRetry.set(true);

// apply changes
ViewMetadata.Builder metadataBuilder = ViewMetadata.buildFrom(base);
request.updates().forEach(update -> update.applyTo(metadataBuilder));

ViewMetadata updated = metadataBuilder.build();

if (updated.changes().isEmpty()) {
// do not commit if the metadata has not changed
return;
}

// commit
taskOps.commit(base, updated);
});

} catch (ValidationFailureException e) {
throw e.wrapped();
}

return ops.current();
}
}
Loading

0 comments on commit 59bde7f

Please sign in to comment.