Skip to content

Commit

Permalink
Updates due to new Builder API
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Aug 3, 2023
1 parent dc7c959 commit 74f4d00
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 54 deletions.
64 changes: 27 additions & 37 deletions core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.view.BaseView;
import org.apache.iceberg.view.ImmutableSQLViewRepresentation;
import org.apache.iceberg.view.ImmutableViewHistoryEntry;
import org.apache.iceberg.view.ImmutableViewMetadata;
import org.apache.iceberg.view.ImmutableViewVersion;
import org.apache.iceberg.view.View;
import org.apache.iceberg.view.ViewBuilder;
Expand Down Expand Up @@ -354,8 +352,8 @@ public ViewBuilder buildView(TableIdentifier identifier) {
protected class BaseViewBuilder implements ViewBuilder {
private final TableIdentifier identifier;
private final ImmutableViewVersion.Builder viewVersionBuilder = ImmutableViewVersion.builder();
private final ImmutableViewMetadata.Builder builder = ImmutableViewMetadata.builder();
private final List<ViewRepresentation> viewRepresentations = Lists.newArrayList();
private Schema schema;
private final Map<String, String> properties = Maps.newHashMap();

public BaseViewBuilder(TableIdentifier identifier) {
Expand All @@ -365,9 +363,9 @@ public BaseViewBuilder(TableIdentifier identifier) {
}

@Override
public ViewBuilder withSchema(Schema schema) {
builder.addSchemas(schema).currentSchemaId(schema.schemaId());
viewVersionBuilder.schemaId(schema.schemaId());
public ViewBuilder withSchema(Schema newSchema) {
this.schema = newSchema;
viewVersionBuilder.schemaId(newSchema.schemaId());
return this;
}

Expand Down Expand Up @@ -419,19 +417,18 @@ public View create() {
.putSummary("operation", "create")
.build();

ViewMetadata viewMetadata =
builder
.properties(properties)
.location(defaultWarehouseLocation(identifier))
.formatVersion(1)
.currentVersionId(viewVersion.versionId())
.addVersions(viewVersion)
.addHistory(
ImmutableViewHistoryEntry.builder()
.timestampMillis(timestampMillis)
.versionId(viewVersion.versionId())
.build())
.build();
ViewMetadata.Builder builder =
ViewMetadata.builder()
.setProperties(properties)
.setLocation(defaultWarehouseLocation(identifier))
.setCurrentVersionId(viewVersion.versionId())
.addVersion(viewVersion);

if (null != schema) {
builder.addSchema(schema).setCurrentSchemaId(schema.schemaId());
}

ViewMetadata viewMetadata = builder.build();

try {
ops.commit(null, viewMetadata);
Expand Down Expand Up @@ -461,24 +458,17 @@ public View replace() {
.putSummary("operation", "replace")
.build();

// TODO: does it matter that new schema isn't added at the end of the schema list?
ViewMetadata replacement =
builder
.putAllProperties(metadata.properties())
.putAllProperties(properties)
.formatVersion(metadata.formatVersion())
.location(metadata.location())
.addAllSchemas(metadata.schemas())
.currentVersionId(viewVersion.versionId())
.addAllVersions(metadata.versions())
.addVersions(viewVersion)
.addAllHistory(metadata.history())
.addHistory(
ImmutableViewHistoryEntry.builder()
.timestampMillis(timestampMillis)
.versionId(viewVersion.versionId())
.build())
.build();
ViewMetadata.Builder builder =
ViewMetadata.buildFrom(metadata)
.setProperties(properties)
.setCurrentVersionId(viewVersion.versionId())
.addVersion(viewVersion);

if (null != schema) {
builder.addSchema(schema).setCurrentSchemaId(schema.schemaId());
}

ViewMetadata replacement = builder.build();

try {
ops.commit(metadata, replacement);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ public void commit() {
taskOps -> {
Map<String, String> newProperties = apply();
ViewMetadata updated =
ImmutableViewMetadata.builder().from(base).properties(newProperties).build();
ViewMetadata.buildFrom(base)
.setProperties(newProperties)
.removeProperties(removals)
.build();
taskOps.commit(base, updated);
});
}
Expand Down
16 changes: 5 additions & 11 deletions core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,19 +95,13 @@ public void commit() {
return;
}

ImmutableViewMetadata.Builder builder =
ImmutableViewMetadata.builder()
.from(this.base)
.currentVersionId(newVersion.versionId())
.addVersions(newVersion)
.addHistory(
ImmutableViewHistoryEntry.builder()
.versionId(newVersion.versionId())
.timestampMillis(newVersion.timestampMillis())
.build());
ViewMetadata.Builder builder =
ViewMetadata.buildFrom(this.base)
.setCurrentVersionId(newVersion.versionId())
.addVersion(newVersion);

if (null != schema) {
builder.addSchemas(schema).currentSchemaId(schema.schemaId());
builder.addSchema(schema).setCurrentSchemaId(schema.schemaId());
}

taskOps.commit(base, builder.build());
Expand Down
26 changes: 21 additions & 5 deletions core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public void basicCreateView() {
catalog()
.buildView(identifier)
.withSchema(SCHEMA)
.withDefaultNamespace(identifier.namespace())
.withQuery("spark", "select * from ns.tbl")
.create();

Expand All @@ -92,7 +93,7 @@ public void basicCreateView() {
assertThat(viewVersion.summary()).hasSize(1).containsEntry("operation", "create");
assertThat(viewVersion.operation()).isEqualTo("create");
assertThat(viewVersion.defaultCatalog()).isNull();
assertThat(viewVersion.defaultNamespace()).isNull();
assertThat(viewVersion.defaultNamespace()).isEqualTo(identifier.namespace());
assertThat(viewVersion.representations())
.hasSize(1)
.containsExactly(
Expand Down Expand Up @@ -181,6 +182,7 @@ public void createViewThatAlreadyExists() {
catalog()
.buildView(viewIdentifier)
.withSchema(SCHEMA)
.withDefaultNamespace(viewIdentifier.namespace())
.withQuery("spark", "select * from ns.tbl")
.create();

Expand All @@ -207,6 +209,7 @@ public void createViewThatAlreadyExists() {
catalog()
.buildView(tableIdentifier)
.withSchema(OTHER_SCHEMA)
.withDefaultNamespace(tableIdentifier.namespace())
.withQuery("spark", "select * from ns.tbl")
.create())
.isInstanceOf(AlreadyExistsException.class)
Expand All @@ -225,6 +228,7 @@ public void renameView() {
catalog()
.buildView(from)
.withSchema(SCHEMA)
.withDefaultNamespace(from.namespace())
.withQuery("spark", "select * from ns.tbl")
.create();

Expand Down Expand Up @@ -259,7 +263,7 @@ public void renameView() {
assertThat(viewVersion.summary()).hasSize(1).containsEntry("operation", "create");
assertThat(viewVersion.operation()).isEqualTo("create");
assertThat(viewVersion.defaultCatalog()).isNull();
assertThat(viewVersion.defaultNamespace()).isNull();
assertThat(viewVersion.defaultNamespace()).isEqualTo(to.namespace());
assertThat(viewVersion.representations())
.hasSize(1)
.containsExactly(
Expand All @@ -285,6 +289,7 @@ public void renameViewUsingDifferentNamespace() {
catalog()
.buildView(from)
.withSchema(SCHEMA)
.withDefaultNamespace(from.namespace())
.withQuery("spark", "select * from ns.tbl")
.create();

Expand Down Expand Up @@ -378,6 +383,7 @@ public void renameViewTargetAlreadyExists() {
catalog()
.buildView(viewIdentifier)
.withSchema(SCHEMA)
.withDefaultNamespace(from.namespace())
.withQuery("spark", "select * from ns.tbl")
.create();
}
Expand Down Expand Up @@ -419,7 +425,8 @@ public void listViews() {
catalog()
.buildView(view1)
.withSchema(SCHEMA)
.withQuery("spark", "select * from ns.tbl")
.withDefaultNamespace(view1.namespace())
.withQuery("spark", "select * from ns1.tbl")
.create();

assertThat(catalog().listViews(ns1)).containsExactly(view1);
Expand All @@ -428,7 +435,8 @@ public void listViews() {
catalog()
.buildView(view2)
.withSchema(SCHEMA)
.withQuery("spark", "select * from ns.tbl")
.withDefaultNamespace(view2.namespace())
.withQuery("spark", "select * from ns1.tbl")
.create();

assertThat(catalog().listViews(ns1)).containsExactly(view1);
Expand All @@ -437,6 +445,7 @@ public void listViews() {
catalog()
.buildView(view3)
.withSchema(SCHEMA)
.withDefaultNamespace(view3.namespace())
.withQuery("spark", "select * from ns.tbl")
.create();

Expand Down Expand Up @@ -509,6 +518,7 @@ public void createOrReplaceView(boolean useCreateOrReplace) {
catalog()
.buildView(identifier)
.withSchema(OTHER_SCHEMA)
.withDefaultNamespace(identifier.namespace())
.withQuery("trino", "select count(*) from ns.tbl")
.withProperty("replacedProp1", "val1")
.withProperty("replacedProp2", "val2");
Expand Down Expand Up @@ -574,6 +584,7 @@ public void updateViewProperties() {
catalog()
.buildView(identifier)
.withSchema(SCHEMA)
.withDefaultNamespace(identifier.namespace())
.withQuery("spark", "select * from ns.tbl")
.create();

Expand Down Expand Up @@ -691,6 +702,7 @@ public void replaceViewVersion() {
catalog()
.buildView(identifier)
.withSchema(SCHEMA)
.withDefaultNamespace(identifier.namespace())
.withQuery(trino.dialect(), trino.sql())
.withQuery(spark.dialect(), spark.sql())
.create();
Expand Down Expand Up @@ -756,7 +768,10 @@ public void replaceViewVersion() {
.dialect("spark")
.build();

view.replaceVersion().withQuery(updatedSpark.dialect(), updatedSpark.sql()).commit();
view.replaceVersion()
.withQuery(updatedSpark.dialect(), updatedSpark.sql())
.withDefaultNamespace(identifier.namespace())
.commit();

View updatedView2 = catalog().loadView(identifier);
assertThat(updatedView2.properties()).isEmpty();
Expand Down Expand Up @@ -812,6 +827,7 @@ public void replaceViewVersionEmptyCommit() {
catalog()
.buildView(identifier)
.withSchema(SCHEMA)
.withDefaultNamespace(identifier.namespace())
.withQuery(trino.dialect(), trino.sql())
.create();

Expand Down

0 comments on commit 74f4d00

Please sign in to comment.