Skip to content

Commit

Permalink
Nessie: Minor Refactoring of NessieTableOperations (#7893)
Browse files Browse the repository at this point in the history
* Nessie: Minor Refactoring of NessieTableOperations

* Handle comments

* Handle IO outisde the NessieUtil
  • Loading branch information
ajantha-bhat authored Jun 27, 2023
1 parent c96edfc commit dd0ee24
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,14 @@ public NessieApiV1 getApi() {
return api;
}

public UpdateableReference getRef() {
UpdateableReference getRef() {
return reference.get();
}

public Reference getReference() {
return reference.get().getReference();
}

public void refresh() throws NessieNotFoundException {
getRef().refresh(api);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,15 @@
import java.util.List;
import java.util.Map;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.projectnessie.client.http.HttpClientException;
import org.projectnessie.error.NessieConflictException;
import org.projectnessie.error.NessieNotFoundException;
Expand Down Expand Up @@ -81,47 +78,6 @@ protected String tableName() {
return key.toString();
}

private TableMetadata loadTableMetadata(String metadataLocation, Reference reference) {
// Update the TableMetadata with the Content of NessieTableState.
TableMetadata deserialized = TableMetadataParser.read(io(), metadataLocation);
Map<String, String> newProperties = Maps.newHashMap(deserialized.properties());
newProperties.put(NESSIE_COMMIT_ID_PROPERTY, reference.getHash());
// To prevent accidental deletion of files that are still referenced by other branches/tags,
// setting GC_ENABLED to false. So that all Iceberg's gc operations like expire_snapshots,
// remove_orphan_files, drop_table with purge will fail with an error.
// Nessie CLI will provide a reference aware GC functionality for the expired/unreferenced
// files.
newProperties.put(TableProperties.GC_ENABLED, "false");

boolean metadataCleanupEnabled =
newProperties
.getOrDefault(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, "false")
.equalsIgnoreCase("true");
if (metadataCleanupEnabled) {
newProperties.put(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, "false");
LOG.warn(
"Automatic table metadata files cleanup was requested, but disabled because "
+ "the Nessie catalog can use historical metadata files from other references. "
+ "Use the 'nessie-gc' tool for history-aware GC");
}

TableMetadata.Builder builder =
TableMetadata.buildFrom(deserialized)
.setPreviousFileLocation(null)
.setCurrentSchema(table.getSchemaId())
.setDefaultSortOrder(table.getSortOrderId())
.setDefaultPartitionSpec(table.getSpecId())
.withMetadataLocation(metadataLocation)
.setProperties(newProperties);
if (table.getSnapshotId() != -1) {
builder.setBranchSnapshot(table.getSnapshotId(), SnapshotRef.MAIN_BRANCH);
}
LOG.info(
"loadTableMetadata for '{}' from location '{}' at '{}'", key, metadataLocation, reference);

return builder.discardChanges().build();
}

@Override
protected void doRefresh() {
try {
Expand Down Expand Up @@ -159,7 +115,17 @@ protected void doRefresh() {
throw new NoSuchTableException(ex, "No such table '%s'", key);
}
}
refreshFromMetadataLocation(metadataLocation, null, 2, l -> loadTableMetadata(l, reference));
refreshFromMetadataLocation(
metadataLocation,
null,
2,
location ->
NessieUtil.updateTableMetadataWithNessieSpecificProperties(
TableMetadataParser.read(fileIO, location),
location,
table,
key.toString(),
reference));
}

@Override
Expand Down
58 changes: 58 additions & 0 deletions nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,26 @@
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.projectnessie.model.CommitMeta;
import org.projectnessie.model.ContentKey;
import org.projectnessie.model.IcebergTable;
import org.projectnessie.model.ImmutableCommitMeta;
import org.projectnessie.model.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class NessieUtil {

private static final Logger LOG = LoggerFactory.getLogger(NessieUtil.class);

public static final String NESSIE_CONFIG_PREFIX = "nessie.";
static final String APPLICATION_TYPE = "application-type";

Expand Down Expand Up @@ -91,4 +101,52 @@ private static String commitAuthor(Map<String, String> catalogOptions) {
return Optional.ofNullable(catalogOptions.get(CatalogProperties.USER))
.orElseGet(() -> System.getProperty("user.name"));
}

public static TableMetadata updateTableMetadataWithNessieSpecificProperties(
TableMetadata tableMetadata,
String metadataLocation,
IcebergTable table,
String identifier,
Reference reference) {
// Update the TableMetadata with the Content of NessieTableState.
Map<String, String> newProperties = Maps.newHashMap(tableMetadata.properties());
newProperties.put(NessieTableOperations.NESSIE_COMMIT_ID_PROPERTY, reference.getHash());
// To prevent accidental deletion of files that are still referenced by other branches/tags,
// setting GC_ENABLED to false. So that all Iceberg's gc operations like expire_snapshots,
// remove_orphan_files, drop_table with purge will fail with an error.
// Nessie CLI will provide a reference aware GC functionality for the expired/unreferenced
// files.
newProperties.put(TableProperties.GC_ENABLED, "false");

boolean metadataCleanupEnabled =
newProperties
.getOrDefault(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, "false")
.equalsIgnoreCase("true");
if (metadataCleanupEnabled) {
newProperties.put(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, "false");
LOG.warn(
"Automatic table metadata files cleanup was requested, but disabled because "
+ "the Nessie catalog can use historical metadata files from other references. "
+ "Use the 'nessie-gc' tool for history-aware GC");
}

TableMetadata.Builder builder =
TableMetadata.buildFrom(tableMetadata)
.setPreviousFileLocation(null)
.setCurrentSchema(table.getSchemaId())
.setDefaultSortOrder(table.getSortOrderId())
.setDefaultPartitionSpec(table.getSpecId())
.withMetadataLocation(metadataLocation)
.setProperties(newProperties);
if (table.getSnapshotId() != -1) {
builder.setBranchSnapshot(table.getSnapshotId(), SnapshotRef.MAIN_BRANCH);
}
LOG.info(
"loadTableMetadata for '{}' from location '{}' at '{}'",
identifier,
metadataLocation,
reference);

return builder.discardChanges().build();
}
}

0 comments on commit dd0ee24

Please sign in to comment.