Skip to content

Commit

Permalink
Core: Add View support for REST catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Jun 28, 2023
1 parent 4568217 commit 1c9df82
Show file tree
Hide file tree
Showing 32 changed files with 2,159 additions and 31 deletions.
119 changes: 119 additions & 0 deletions api/src/main/java/org/apache/iceberg/catalog/ViewSessionCatalog.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.catalog;

import java.util.List;
import java.util.Map;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchViewException;
import org.apache.iceberg.view.View;
import org.apache.iceberg.view.ViewBuilder;

/** A session Catalog API for view create, drop, and load operations. */
public interface ViewSessionCatalog {

/**
* Return the name for this catalog.
*
* @return this catalog's name
*/
String name();

/**
* Return all the identifiers under this namespace.
*
* @param namespace a namespace
* @return a list of identifiers for views
* @throws NoSuchNamespaceException if the namespace is not found
*/
List<TableIdentifier> listViews(SessionCatalog.SessionContext context, Namespace namespace);

/**
* Load a view.
*
* @param identifier a view identifier
* @return instance of {@link View} implementation referred by the identifier
* @throws NoSuchViewException if the view does not exist
*/
View loadView(SessionCatalog.SessionContext context, TableIdentifier identifier);

/**
* Check whether view exists.
*
* @param identifier a view identifier
* @return true if the view exists, false otherwise
*/
default boolean viewExists(SessionCatalog.SessionContext context, TableIdentifier identifier) {
try {
loadView(context, identifier);
return true;
} catch (NoSuchViewException e) {
return false;
}
}

/**
* Instantiate a builder to create or replace a SQL view.
*
* @param identifier a view identifier
* @return a view builder
*/
ViewBuilder buildView(SessionCatalog.SessionContext context, TableIdentifier identifier);

/**
* Drop a view.
*
* @param identifier a view identifier
* @return true if the view was dropped, false if the view did not exist
*/
boolean dropView(SessionCatalog.SessionContext context, TableIdentifier identifier);

/**
* Rename a view.
*
* @param from identifier of the view to rename
* @param to new view identifier
* @throws NoSuchViewException if the "from" view does not exist
* @throws AlreadyExistsException if the "to" view already exists
*/
void renameView(SessionCatalog.SessionContext context, TableIdentifier from, TableIdentifier to);

/**
* Invalidate cached view metadata from current catalog.
*
* <p>If the view is already loaded or cached, drop cached data. If the view does not exist or is
* not cached, do nothing.
*
* @param identifier a view identifier
*/
default void invalidateView(SessionCatalog.SessionContext context, TableIdentifier identifier) {}

/**
* Initialize a view catalog given a custom name and a map of catalog properties.
*
* <p>A custom view catalog implementation must have a no-arg constructor. A compute engine like
* Spark or Flink will first initialize the catalog without any arguments, and then call this
* method to complete catalog initialization with properties passed into the engine.
*
* @param name a custom name for the catalog
* @param properties catalog properties
*/
default void initialize(String name, Map<String, String> properties) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ public View create() {
ViewVersion viewVersion =
viewVersionBuilder
.versionId(1)
.addRepresentations(viewRepresentationBuilder.build())
// FIXME. can't use only withRepresentation(SQLRepresentation..)
// .addRepresentations(viewRepresentationBuilder.build())
.timestampMillis(timestampMillis)
.putSummary("operation", "create")
.build();
Expand Down
91 changes: 90 additions & 1 deletion core/src/main/java/org/apache/iceberg/MetadataUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.view.ImmutableViewMetadata;
import org.apache.iceberg.view.ViewVersion;

/** Represents a change to table metadata. */
/** Represents a change to table/view metadata. */
public interface MetadataUpdate extends Serializable {
void applyTo(TableMetadata.Builder metadataBuilder);

default void applyTo(ImmutableViewMetadata.Builder viewMetadataBuilder) {}

class AssignUUID implements MetadataUpdate {
private final String uuid;

Expand Down Expand Up @@ -60,6 +65,12 @@ public int formatVersion() {
public void applyTo(TableMetadata.Builder metadataBuilder) {
metadataBuilder.upgradeFormatVersion(formatVersion);
}

@Override
public void applyTo(ImmutableViewMetadata.Builder viewMetadataBuilder) {
viewMetadataBuilder.formatVersion(formatVersion);
viewMetadataBuilder.addChanges(new MetadataUpdate.UpgradeFormatVersion(formatVersion));
}
}

class AddSchema implements MetadataUpdate {
Expand All @@ -83,6 +94,12 @@ public int lastColumnId() {
public void applyTo(TableMetadata.Builder metadataBuilder) {
metadataBuilder.addSchema(schema, lastColumnId);
}

@Override
public void applyTo(ImmutableViewMetadata.Builder viewMetadataBuilder) {
viewMetadataBuilder.addSchemas(schema);
viewMetadataBuilder.addChanges(new MetadataUpdate.AddSchema(schema, lastColumnId));
}
}

class SetCurrentSchema implements MetadataUpdate {
Expand All @@ -100,6 +117,12 @@ public int schemaId() {
public void applyTo(TableMetadata.Builder metadataBuilder) {
metadataBuilder.setCurrentSchema(schemaId);
}

@Override
public void applyTo(ImmutableViewMetadata.Builder viewMetadataBuilder) {
viewMetadataBuilder.currentSchemaId(schemaId);
viewMetadataBuilder.addChanges(new MetadataUpdate.SetCurrentSchema(schemaId));
}
}

class AddPartitionSpec implements MetadataUpdate {
Expand Down Expand Up @@ -343,6 +366,12 @@ public Map<String, String> updated() {
public void applyTo(TableMetadata.Builder metadataBuilder) {
metadataBuilder.setProperties(updated);
}

@Override
public void applyTo(ImmutableViewMetadata.Builder viewMetadataBuilder) {
viewMetadataBuilder.putAllProperties(updated);
viewMetadataBuilder.addChanges(new MetadataUpdate.SetProperties(updated));
}
}

class RemoveProperties implements MetadataUpdate {
Expand All @@ -360,6 +389,14 @@ public Set<String> removed() {
public void applyTo(TableMetadata.Builder metadataBuilder) {
metadataBuilder.removeProperties(removed);
}

@Override
public void applyTo(ImmutableViewMetadata.Builder viewMetadataBuilder) {
Map<String, String> properties = Maps.newHashMap(viewMetadataBuilder.build().properties());
removed.forEach(properties::remove);
viewMetadataBuilder.properties(properties);
viewMetadataBuilder.addChanges(new MetadataUpdate.RemoveProperties(removed));
}
}

class SetLocation implements MetadataUpdate {
Expand All @@ -377,5 +414,57 @@ public String location() {
public void applyTo(TableMetadata.Builder metadataBuilder) {
metadataBuilder.setLocation(location);
}

@Override
public void applyTo(ImmutableViewMetadata.Builder viewMetadataBuilder) {
viewMetadataBuilder.location(location);
viewMetadataBuilder.addChanges(new MetadataUpdate.SetLocation(location));
}
}

class AddViewVersion implements MetadataUpdate {
private final ViewVersion viewVersion;

public AddViewVersion(ViewVersion viewVersion) {
this.viewVersion = viewVersion;
}

public ViewVersion viewVersion() {
return viewVersion;
}

@Override
public void applyTo(TableMetadata.Builder metadataBuilder) {
// noop
}

@Override
public void applyTo(ImmutableViewMetadata.Builder viewMetadataBuilder) {
viewMetadataBuilder.addVersions(viewVersion);
viewMetadataBuilder.addChanges(new MetadataUpdate.AddViewVersion(viewVersion));
}
}

class SetCurrentViewVersion implements MetadataUpdate {
private final int versionId;

public SetCurrentViewVersion(int versionId) {
this.versionId = versionId;
}

public int versionId() {
return versionId;
}

@Override
public void applyTo(TableMetadata.Builder metadataBuilder) {
// noop
}

@Override
public void applyTo(ImmutableViewMetadata.Builder viewMetadataBuilder) {
viewMetadataBuilder.currentVersionId(versionId);
viewMetadataBuilder.addChanges(new MetadataUpdate.SetCurrentViewVersion(versionId));
}
}
}
42 changes: 42 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.util.JsonUtil;
import org.apache.iceberg.view.ViewVersionParser;

public class MetadataUpdateParser {

Expand All @@ -54,6 +55,8 @@ private MetadataUpdateParser() {}
static final String SET_LOCATION = "set-location";
static final String SET_STATISTICS = "set-statistics";
static final String REMOVE_STATISTICS = "remove-statistics";
static final String ADD_VIEW_VERSION = "add-view-version";
static final String SET_CURRENT_VIEW_VERSION = "set-current-view-version";

// AssignUUID
private static final String UUID = "uuid";
Expand Down Expand Up @@ -112,6 +115,12 @@ private MetadataUpdateParser() {}
// SetLocation
private static final String LOCATION = "location";

// AddViewVersion
private static final String VIEW_VERSION = "view-version";

// SetCurrentViewVersion
private static final String VIEW_VERSION_ID = "view-version-id";

private static final Map<Class<? extends MetadataUpdate>, String> ACTIONS =
ImmutableMap.<Class<? extends MetadataUpdate>, String>builder()
.put(MetadataUpdate.AssignUUID.class, ASSIGN_UUID)
Expand All @@ -131,6 +140,8 @@ private MetadataUpdateParser() {}
.put(MetadataUpdate.SetProperties.class, SET_PROPERTIES)
.put(MetadataUpdate.RemoveProperties.class, REMOVE_PROPERTIES)
.put(MetadataUpdate.SetLocation.class, SET_LOCATION)
.put(MetadataUpdate.AddViewVersion.class, ADD_VIEW_VERSION)
.put(MetadataUpdate.SetCurrentViewVersion.class, SET_CURRENT_VIEW_VERSION)
.buildOrThrow();

public static String toJson(MetadataUpdate metadataUpdate) {
Expand Down Expand Up @@ -208,6 +219,13 @@ public static void toJson(MetadataUpdate metadataUpdate, JsonGenerator generator
case SET_LOCATION:
writeSetLocation((MetadataUpdate.SetLocation) metadataUpdate, generator);
break;
case ADD_VIEW_VERSION:
writeAddViewVersion((MetadataUpdate.AddViewVersion) metadataUpdate, generator);
break;
case SET_CURRENT_VIEW_VERSION:
writeSetCurrentViewVersionId(
(MetadataUpdate.SetCurrentViewVersion) metadataUpdate, generator);
break;
default:
throw new IllegalArgumentException(
String.format(
Expand Down Expand Up @@ -271,6 +289,10 @@ public static MetadataUpdate fromJson(JsonNode jsonNode) {
return readRemoveProperties(jsonNode);
case SET_LOCATION:
return readSetLocation(jsonNode);
case ADD_VIEW_VERSION:
return readAddViewVersion(jsonNode);
case SET_CURRENT_VIEW_VERSION:
return readCurrentViewVersionId(jsonNode);
default:
throw new UnsupportedOperationException(
String.format("Cannot convert metadata update action to json: %s", action));
Expand Down Expand Up @@ -384,6 +406,17 @@ private static void writeSetLocation(MetadataUpdate.SetLocation update, JsonGene
gen.writeStringField(LOCATION, update.location());
}

private static void writeAddViewVersion(
MetadataUpdate.AddViewVersion metadataUpdate, JsonGenerator gen) throws IOException {
gen.writeFieldName(VIEW_VERSION);
ViewVersionParser.toJson(metadataUpdate.viewVersion(), gen);
}

private static void writeSetCurrentViewVersionId(
MetadataUpdate.SetCurrentViewVersion metadataUpdate, JsonGenerator gen) throws IOException {
gen.writeNumberField(VIEW_VERSION_ID, metadataUpdate.versionId());
}

private static MetadataUpdate readAssignUUID(JsonNode node) {
String uuid = JsonUtil.getString(UUID, node);
return new MetadataUpdate.AssignUUID(uuid);
Expand Down Expand Up @@ -512,4 +545,13 @@ private static MetadataUpdate readSetLocation(JsonNode node) {
String location = JsonUtil.getString(LOCATION, node);
return new MetadataUpdate.SetLocation(location);
}

private static MetadataUpdate readAddViewVersion(JsonNode node) {
return new MetadataUpdate.AddViewVersion(
ViewVersionParser.fromJson(JsonUtil.get(VIEW_VERSION, node)));
}

private static MetadataUpdate readCurrentViewVersionId(JsonNode node) {
return new MetadataUpdate.SetCurrentViewVersion(JsonUtil.getInt(VIEW_VERSION_ID, node));
}
}
Loading

0 comments on commit 1c9df82

Please sign in to comment.