Skip to content

Commit

Permalink
Add iceberg table properties for maintenance of previous metadata files
Browse files Browse the repository at this point in the history
  • Loading branch information
hantangwangd committed Jul 19, 2024
1 parent 2fdbc69 commit 6d2a543
Show file tree
Hide file tree
Showing 10 changed files with 209 additions and 20 deletions.
14 changes: 13 additions & 1 deletion presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,13 @@ Property Name Description

Set to ``0`` to disable metadata optimization.

``iceberg.split-manager-threads`` Number of threads to use for generating Iceberg splits. ``Number of available processors``
``iceberg.split-manager-threads`` Number of threads to use for generating Iceberg splits. ``Number of available processors``

``iceberg.metadata-previous-versions-max`` The max number of previous metadata files recorded in ``100``
current metadata.

``iceberg.metadata-delete-after-commit`` Set to ``true`` to delete the oldest metadata files after ``false``
each commit.
======================================================= ============================================================= ============

Table Properties
Expand Down Expand Up @@ -355,6 +361,12 @@ Property Name Description
``delete_mode`` Optionally specifies the write delete mode of the Iceberg ``merge-on-read``
specification to use for new tables, either ``copy-on-write``
or ``merge-on-read``.

``metadata_previous_versions_max`` Optionally specifies the max number of previous metadata files ``100``
recorded in current metadata.

``metadata_delete_after_commit`` Set to ``true`` to delete the oldest metadata file after ``false``
each commit.
======================================= =============================================================== ============

The table definition below specifies format ``ORC``, partitioning by columns ``c1`` and ``c2``,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@
import static com.facebook.presto.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
import static com.facebook.presto.iceberg.IcebergTableProperties.FORMAT_VERSION;
import static com.facebook.presto.iceberg.IcebergTableProperties.LOCATION_PROPERTY;
import static com.facebook.presto.iceberg.IcebergTableProperties.METADATA_DELETE_AFTER_COMMIT;
import static com.facebook.presto.iceberg.IcebergTableProperties.METADATA_PREVIOUS_VERSIONS_MAX;
import static com.facebook.presto.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY;
import static com.facebook.presto.iceberg.IcebergTableType.CHANGELOG;
import static com.facebook.presto.iceberg.IcebergTableType.DATA;
Expand Down Expand Up @@ -552,6 +554,8 @@ protected ImmutableMap<String, Object> createMetadataProperties(Table icebergTab
}

properties.put(DELETE_MODE, IcebergUtil.getDeleteMode(icebergTable));
properties.put(METADATA_PREVIOUS_VERSIONS_MAX, IcebergUtil.getMetadataPreviousVersionsMax(icebergTable));
properties.put(METADATA_DELETE_AFTER_COMMIT, IcebergUtil.isMetadataDeleteAfterCommit(icebergTable));

return properties.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT;
import static org.apache.iceberg.TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT;
import static org.apache.iceberg.TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT;

public class IcebergConfig
{
Expand All @@ -53,6 +55,8 @@ public class IcebergConfig
private boolean pushdownFilterEnabled;
private boolean deleteAsJoinRewriteEnabled = true;
private int rowsForMetadataOptimizationThreshold = 1000;
private int metadataPreviousVersionsMax = METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT;
private boolean metadataDeleteAfterCommit = METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT;

private EnumSet<ColumnStatisticType> hiveStatisticsMergeFlags = EnumSet.noneOf(ColumnStatisticType.class);
private String fileIOImpl = HadoopFileIO.class.getName();
Expand Down Expand Up @@ -349,4 +353,31 @@ public IcebergConfig setSplitManagerThreads(int splitManagerThreads)
this.splitManagerThreads = splitManagerThreads;
return this;
}

@Min(0)
public int getMetadataPreviousVersionsMax()
{
return metadataPreviousVersionsMax;
}

@Config("iceberg.metadata-previous-versions-max")
@ConfigDescription("The max number of previous metadata files exist in metadata log")
public IcebergConfig setMetadataPreviousVersionsMax(int metadataPreviousVersionsMax)
{
this.metadataPreviousVersionsMax = metadataPreviousVersionsMax;
return this;
}

public boolean isMetadataDeleteAfterCommit()
{
return metadataDeleteAfterCommit;
}

@Config("iceberg.metadata-delete-after-commit")
@ConfigDescription("Whether enables to delete the oldest metadata file after commit")
public IcebergConfig setMetadataDeleteAfterCommit(boolean metadataDeleteAfterCommit)
{
this.metadataDeleteAfterCommit = metadataDeleteAfterCommit;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType;
import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.integerProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.stringProperty;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand All @@ -40,6 +41,8 @@ public class IcebergTableProperties
public static final String FORMAT_VERSION = "format_version";
public static final String COMMIT_RETRIES = "commit_retries";
public static final String DELETE_MODE = "delete_mode";
public static final String METADATA_PREVIOUS_VERSIONS_MAX = "metadata_previous_versions_max";
public static final String METADATA_DELETE_AFTER_COMMIT = "metadata_delete_after_commit";
private static final String DEFAULT_FORMAT_VERSION = "2";

private final List<PropertyMetadata<?>> tableProperties;
Expand Down Expand Up @@ -93,6 +96,16 @@ public IcebergTableProperties(IcebergConfig icebergConfig)
false,
value -> RowLevelOperationMode.fromName((String) value),
RowLevelOperationMode::modeName))
.add(integerProperty(
METADATA_PREVIOUS_VERSIONS_MAX,
"The max number of previous metadata files exist in metadata log",
icebergConfig.getMetadataPreviousVersionsMax(),
false))
.add(booleanProperty(
METADATA_DELETE_AFTER_COMMIT,
"Whether enables to delete the oldest metadata file after commit",
icebergConfig.isMetadataDeleteAfterCommit(),
false))
.build();

columnProperties = ImmutableList.of(stringProperty(
Expand Down Expand Up @@ -143,4 +156,14 @@ public static RowLevelOperationMode getDeleteMode(Map<String, Object> tablePrope
{
return (RowLevelOperationMode) tableProperties.get(DELETE_MODE);
}

public static Integer getMetadataPreviousVersionsMax(Map<String, Object> tableProperties)
{
return (Integer) tableProperties.get(METADATA_PREVIOUS_VERSIONS_MAX);
}

public static Boolean isMetadataDeleteAfterCommit(Map<String, Object> tableProperties)
{
return (Boolean) tableProperties.get(METADATA_DELETE_AFTER_COMMIT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@
import static org.apache.iceberg.TableProperties.DELETE_MODE_DEFAULT;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
import static org.apache.iceberg.TableProperties.MERGE_MODE;
import static org.apache.iceberg.TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED;
import static org.apache.iceberg.TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT;
import static org.apache.iceberg.TableProperties.METADATA_PREVIOUS_VERSIONS_MAX;
import static org.apache.iceberg.TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.UPDATE_MODE;
Expand Down Expand Up @@ -1079,6 +1083,11 @@ public static Map<String, String> populateTableProperties(ConnectorTableMetadata
propertiesBuilder.put(DELETE_MODE, deleteMode.modeName());
}

Integer metadataPreviousVersionsMax = IcebergTableProperties.getMetadataPreviousVersionsMax(tableMetadata.getProperties());
propertiesBuilder.put(METADATA_PREVIOUS_VERSIONS_MAX, String.valueOf(metadataPreviousVersionsMax));

Boolean metadataDeleteAfterCommit = IcebergTableProperties.isMetadataDeleteAfterCommit(tableMetadata.getProperties());
propertiesBuilder.put(METADATA_DELETE_AFTER_COMMIT_ENABLED, String.valueOf(metadataDeleteAfterCommit));
return propertiesBuilder.build();
}

Expand All @@ -1099,6 +1108,20 @@ public static RowLevelOperationMode getDeleteMode(Table table)
.toUpperCase(Locale.ENGLISH));
}

public static int getMetadataPreviousVersionsMax(Table table)
{
return Integer.parseInt(table.properties()
.getOrDefault(METADATA_PREVIOUS_VERSIONS_MAX,
String.valueOf(METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)));
}

public static boolean isMetadataDeleteAfterCommit(Table table)
{
return Boolean.valueOf(table.properties()
.getOrDefault(METADATA_DELETE_AFTER_COMMIT_ENABLED,
String.valueOf(METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT)));
}

public static Optional<PartitionData> partitionDataFromJson(PartitionSpec spec, Optional<String> partitionDataAsJson)
{
org.apache.iceberg.types.Type[] partitionColumnTypes = spec.fields().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ public void testShowCreateTable()
" delete_mode = 'merge-on-read',\n" +
" format = 'PARQUET',\n" +
" format_version = '2',\n" +
" location = '%s'\n" +
" location = '%s',\n" +
" metadata_delete_after_commit = false,\n" +
" metadata_previous_versions_max = 100\n" +
")", getLocation("tpch", "orders")));
}

Expand Down Expand Up @@ -418,6 +420,8 @@ private void testCreatePartitionedTableAs(Session session, FileFormat fileFormat
" format = '" + fileFormat + "',\n" +
" format_version = '2',\n" +
" location = '%s',\n" +
" metadata_delete_after_commit = false,\n" +
" metadata_previous_versions_max = 100,\n" +
" partitioning = ARRAY['order_status','ship_priority','bucket(order_key, 9)']\n" +
")",
getSession().getCatalog().get(),
Expand Down Expand Up @@ -617,7 +621,9 @@ public void testTableComments()
" delete_mode = 'merge-on-read',\n" +
" format = 'ORC',\n" +
" format_version = '2',\n" +
" location = '%s'\n" +
" location = '%s',\n" +
" metadata_delete_after_commit = false,\n" +
" metadata_previous_versions_max = 100\n" +
")";
String createTableSql = format(createTableTemplate, "test table comment", getLocation("tpch", "test_table_comments"));

Expand Down Expand Up @@ -706,6 +712,8 @@ private void testCreateTableLike()
" format = 'PARQUET',\n" +
" format_version = '2',\n" +
" location = '%s',\n" +
" metadata_delete_after_commit = false,\n" +
" metadata_previous_versions_max = 100,\n" +
" partitioning = ARRAY['adate']\n" +
")", getLocation("tpch", "test_create_table_like_original")));

Expand All @@ -719,7 +727,9 @@ private void testCreateTableLike()
" delete_mode = 'merge-on-read',\n" +
" format = 'PARQUET',\n" +
" format_version = '2',\n" +
" location = '%s'\n" +
" location = '%s',\n" +
" metadata_delete_after_commit = false,\n" +
" metadata_previous_versions_max = 100\n" +
")", getLocation("tpch", "test_create_table_like_copy1")));
dropTable(session, "test_create_table_like_copy1");

Expand All @@ -728,7 +738,9 @@ private void testCreateTableLike()
" delete_mode = 'merge-on-read',\n" +
" format = 'PARQUET',\n" +
" format_version = '2',\n" +
" location = '%s'\n" +
" location = '%s',\n" +
" metadata_delete_after_commit = false,\n" +
" metadata_previous_versions_max = 100\n" +
")", getLocation("tpch", "test_create_table_like_copy2")));
dropTable(session, "test_create_table_like_copy2");

Expand All @@ -738,6 +750,8 @@ private void testCreateTableLike()
" format = 'PARQUET',\n" +
" format_version = '2',\n" +
" location = '%s',\n" +
" metadata_delete_after_commit = false,\n" +
" metadata_previous_versions_max = 100,\n" +
" partitioning = ARRAY['adate']\n" +
")", catalogType.equals(CatalogType.HIVE) ?
getLocation("tpch", "test_create_table_like_original") :
Expand All @@ -750,6 +764,8 @@ private void testCreateTableLike()
" format = 'ORC',\n" +
" format_version = '2',\n" +
" location = '%s',\n" +
" metadata_delete_after_commit = false,\n" +
" metadata_previous_versions_max = 100,\n" +
" partitioning = ARRAY['adate']\n" +
")", catalogType.equals(CatalogType.HIVE) ?
getLocation("tpch", "test_create_table_like_original") :
Expand Down Expand Up @@ -791,7 +807,9 @@ protected void testCreateTableWithFormatVersion(String formatVersion, String def
" delete_mode = '%s',\n" +
" format = 'PARQUET',\n" +
" format_version = '%s',\n" +
" location = '%s'\n" +
" location = '%s',\n" +
" metadata_delete_after_commit = false,\n" +
" metadata_previous_versions_max = 100\n" +
")",
getSession().getCatalog().get(),
getSession().getSchema().get(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1573,6 +1573,49 @@ public void testMetadataDeleteOnV2MorTableWithUnsupportedSpecsWhoseDataAllDelete
}
}

@Test
public void testMetadataVersionsMaintainingProperties()
throws Exception
{
String settingTableName = "test_table_with_setting_properties";
String defaultTableName = "test_table_with_default_setting_properties";
try {
// Create a table with setting properties that maintain only 1 previous metadata version in current metadata,
// and delete unuseful metadata files after each commit
assertUpdate("CREATE TABLE " + settingTableName + " (a INTEGER, b VARCHAR)" +
" WITH (metadata_previous_versions_max = 1, metadata_delete_after_commit = true)");

// Create a table with default table properties that maintain 100 previous metadata versions in current metadata,
// and do not automatically delete any metadata files
assertUpdate("CREATE TABLE " + defaultTableName + " (a INTEGER, b VARCHAR)");

assertUpdate("INSERT INTO " + settingTableName + " VALUES (1, '1001'), (2, '1002')", 2);
assertUpdate("INSERT INTO " + settingTableName + " VALUES (3, '1003'), (4, '1004')", 2);
assertUpdate("INSERT INTO " + settingTableName + " VALUES (5, '1005'), (6, '1006')", 2);
assertUpdate("INSERT INTO " + settingTableName + " VALUES (7, '1007'), (8, '1008')", 2);
assertUpdate("INSERT INTO " + settingTableName + " VALUES (9, '1009'), (10, '1010')", 2);

assertUpdate("INSERT INTO " + defaultTableName + " VALUES (1, '1001'), (2, '1002')", 2);
assertUpdate("INSERT INTO " + defaultTableName + " VALUES (3, '1003'), (4, '1004')", 2);
assertUpdate("INSERT INTO " + defaultTableName + " VALUES (5, '1005'), (6, '1006')", 2);
assertUpdate("INSERT INTO " + defaultTableName + " VALUES (7, '1007'), (8, '1008')", 2);
assertUpdate("INSERT INTO " + defaultTableName + " VALUES (9, '1009'), (10, '1010')", 2);

Table settingTable = loadTable(settingTableName);
TableMetadata settingTableMetadata = ((BaseTable) settingTable).operations().current();
// Table `test_table_with_setting_properties`'s current metadata only record 1 previous metadata file
assertEquals(settingTableMetadata.previousFiles().size(), 1);

Table defaultTable = loadTable(defaultTableName);
TableMetadata defaultTableMetadata = ((BaseTable) defaultTable).operations().current();
// Table `test_table_with_default_setting_properties`'s current metadata record all 5 previous metadata files
assertEquals(defaultTableMetadata.previousFiles().size(), 5);
}
finally {
assertUpdate("DROP TABLE IF EXISTS " + settingTableName);
}
}

private void testCheckDeleteFiles(Table icebergTable, int expectedSize, List<FileContent> expectedFileContent)
{
// check delete file list
Expand Down
Loading

0 comments on commit 6d2a543

Please sign in to comment.