Skip to content

Commit

Permalink
Support delete old metadata files after commit for HiveTableOperations
Browse files Browse the repository at this point in the history
  • Loading branch information
hantangwangd committed Jul 19, 2024
1 parent a16a5f9 commit e7c204e
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Sets;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.iceberg.LocationProviders;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadata.MetadataLogEntry;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
Expand All @@ -52,6 +55,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -315,6 +319,7 @@ public void commit(@Nullable TableMetadata base, TableMetadata metadata)
// attempt to put back previous table statistics
metastore.updateTableStatistics(metastoreContext, database, tableName, oldStats -> tableStats);
}
deleteRemovedMetadataFiles(base, metadata);
}
finally {
shouldRefresh = true;
Expand Down Expand Up @@ -450,4 +455,39 @@ private static int parseVersion(String metadataLocation)
return -1;
}
}

/**
* Deletes the oldest metadata files if {@link
* TableProperties#METADATA_DELETE_AFTER_COMMIT_ENABLED} is true.
*
* @param base table metadata on which previous versions were based
* @param metadata new table metadata with updated previous versions
*/
private void deleteRemovedMetadataFiles(TableMetadata base, TableMetadata metadata)
{
if (base == null) {
return;
}

boolean deleteAfterCommit =
metadata.propertyAsBoolean(
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT);

if (deleteAfterCommit) {
Set<MetadataLogEntry> removedPreviousMetadataFiles =
Sets.newHashSet(base.previousFiles());
// TableMetadata#addPreviousFile builds up the metadata log and uses
// TableProperties.METADATA_PREVIOUS_VERSIONS_MAX to determine how many files should stay in
// the log, thus we don't include metadata.previousFiles() for deletion - everything else can
// be removed
removedPreviousMetadataFiles.removeAll(metadata.previousFiles());
Tasks.foreach(removedPreviousMetadataFiles)
.noRetry()
.suppressFailureWhenFinished()
.onFailure((previousMetadataFile, exc) ->
log.warn("Delete failed for previous metadata file: %s", previousMetadataFile, exc))
.run(previousMetadataFile -> io().deleteFile(previousMetadataFile.file()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogUtil;
Expand Down Expand Up @@ -128,6 +129,7 @@
public abstract class IcebergDistributedTestBase
extends AbstractTestDistributedQueries
{
private static final String METADATA_FILE_EXTENSION = ".metadata.json";
private final CatalogType catalogType;
private final Map<String, String> extraConnectorProperties;

Expand Down Expand Up @@ -1610,6 +1612,16 @@ public void testMetadataVersionsMaintainingProperties()
TableMetadata defaultTableMetadata = ((BaseTable) defaultTable).operations().current();
// Table `test_table_with_default_setting_properties`'s current metadata record all 5 previous metadata files
assertEquals(defaultTableMetadata.previousFiles().size(), 5);

FileSystem fileSystem = getHdfsEnvironment().getFileSystem(new HdfsContext(SESSION), new org.apache.hadoop.fs.Path(settingTable.location()));

// Table `test_table_with_setting_properties`'s all existing metadata files count is 2
FileStatus[] settingTableFiles = fileSystem.listStatus(new org.apache.hadoop.fs.Path(settingTable.location(), "metadata"), name -> name.getName().contains(METADATA_FILE_EXTENSION));
assertEquals(settingTableFiles.length, 2);

// Table `test_table_with_default_setting_properties`'s all existing metadata files count is 6
FileStatus[] defaultTableFiles = fileSystem.listStatus(new org.apache.hadoop.fs.Path(defaultTable.location(), "metadata"), name -> name.getName().contains(METADATA_FILE_EXTENSION));
assertEquals(defaultTableFiles.length, 6);
}
finally {
assertUpdate("DROP TABLE IF EXISTS " + settingTableName);
Expand Down

0 comments on commit e7c204e

Please sign in to comment.