diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HiveTableOperations.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HiveTableOperations.java index 0eff5fdb4a607..82b54b68cd071 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HiveTableOperations.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HiveTableOperations.java @@ -32,13 +32,16 @@ import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableMultimap; +import com.google.common.collect.Sets; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.iceberg.LocationProviders; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadata.MetadataLogEntry; import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; @@ -52,6 +55,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; @@ -315,6 +319,7 @@ public void commit(@Nullable TableMetadata base, TableMetadata metadata) // attempt to put back previous table statistics metastore.updateTableStatistics(metastoreContext, database, tableName, oldStats -> tableStats); } + deleteRemovedMetadataFiles(base, metadata); } finally { shouldRefresh = true; @@ -450,4 +455,39 @@ private static int parseVersion(String metadataLocation) return -1; } } + + /** + * Deletes the oldest metadata files if {@link + * TableProperties#METADATA_DELETE_AFTER_COMMIT_ENABLED} is true. + * + * @param base table metadata on which previous versions were based + * @param metadata new table metadata with updated previous versions + */ + private void deleteRemovedMetadataFiles(TableMetadata base, TableMetadata metadata) + { + if (base == null) { + return; + } + + boolean deleteAfterCommit = + metadata.propertyAsBoolean( + TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, + TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT); + + if (deleteAfterCommit) { + Set removedPreviousMetadataFiles = + Sets.newHashSet(base.previousFiles()); + // TableMetadata#addPreviousFile builds up the metadata log and uses + // TableProperties.METADATA_PREVIOUS_VERSIONS_MAX to determine how many files should stay in + // the log, thus we don't include metadata.previousFiles() for deletion - everything else can + // be removed + removedPreviousMetadataFiles.removeAll(metadata.previousFiles()); + Tasks.foreach(removedPreviousMetadataFiles) + .noRetry() + .suppressFailureWhenFinished() + .onFailure((previousMetadataFile, exc) -> + log.warn("Delete failed for previous metadata file: %s", previousMetadataFile, exc)) + .run(previousMetadataFile -> io().deleteFile(previousMetadataFile.file())); + } + } } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java index 0d6c02f4f030d..026c4bab55f7c 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java @@ -48,6 +48,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogUtil; @@ -128,6 +129,7 @@ public abstract class IcebergDistributedTestBase extends AbstractTestDistributedQueries { + private static final String METADATA_FILE_EXTENSION = ".metadata.json"; private final CatalogType catalogType; private final Map extraConnectorProperties; @@ -1610,6 +1612,16 @@ public void testMetadataVersionsMaintainingProperties() TableMetadata defaultTableMetadata = ((BaseTable) defaultTable).operations().current(); // Table `test_table_with_default_setting_properties`'s current metadata record all 5 previous metadata files assertEquals(defaultTableMetadata.previousFiles().size(), 5); + + FileSystem fileSystem = getHdfsEnvironment().getFileSystem(new HdfsContext(SESSION), new org.apache.hadoop.fs.Path(settingTable.location())); + + // Table `test_table_with_setting_properties`'s all existing metadata files count is 2 + FileStatus[] settingTableFiles = fileSystem.listStatus(new org.apache.hadoop.fs.Path(settingTable.location(), "metadata"), name -> name.getName().contains(METADATA_FILE_EXTENSION)); + assertEquals(settingTableFiles.length, 2); + + // Table `test_table_with_default_setting_properties`'s all existing metadata files count is 6 + FileStatus[] defaultTableFiles = fileSystem.listStatus(new org.apache.hadoop.fs.Path(defaultTable.location(), "metadata"), name -> name.getName().contains(METADATA_FILE_EXTENSION)); + assertEquals(defaultTableFiles.length, 6); } finally { assertUpdate("DROP TABLE IF EXISTS " + settingTableName);