Skip to content

Commit

Permalink
[Iceberg] Add table refresh retry configurations
Browse files Browse the repository at this point in the history
  • Loading branch information
ZacBlanco committed Jul 12, 2024
1 parent d76c2d3 commit f3f060f
Show file tree
Hide file tree
Showing 12 changed files with 282 additions and 50 deletions.
98 changes: 63 additions & 35 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ Metastores
Iceberg tables store most of the metadata in the metadata files, along with the data on the
filesystem, but it still requires a central place to find the current location of the
current metadata pointer for a table. This central place is called the ``Iceberg Catalog``.
The Presto Iceberg connector supports different types of Iceberg Catalogs : ``Hive Metastore``,
``GLUE``, ``NESSIE``, ``REST`` and ``HADOOP``.
The Presto Iceberg connector supports different types of Iceberg Catalogs : ``HIVE``,
``NESSIE``, ``REST``, and ``HADOOP``.

To configure the Iceberg connector, create a catalog properties file
``etc/catalog/iceberg.properties``. To define the catalog type, ``iceberg.catalog.type`` property
Expand Down Expand Up @@ -45,6 +45,48 @@ as a Hive connector.
hive.metastore=glue
iceberg.catalog.type=hive
There are additional configurations available when using the Iceberg connector configured with Hive
or Glue catalogs.

======================================================== ============================================================= ============
Property Name Description Default
======================================================== ============================================================= ============
``hive.metastore.uri`` The URI(s) of the Hive metastore to connect to using the
Thrift protocol. If multiple URIs are provided, the first
URI is used by default, and the rest of the URIs are
fallback metastores.

Example: ``thrift://192.0.2.3:9083`` or
``thrift://192.0.2.3:9083,thrift://192.0.2.4:9083``.

This property is required if the
``iceberg.catalog.type`` is ``hive`` and ``hive.metastore``
is ``thrift``.

``iceberg.hive-statistics-merge-strategy`` Comma separated list of statistics to use from the
Hive Metastore to override Iceberg table statistics.
The available values are ``NUMBER_OF_DISTINCT_VALUES``
and ``TOTAL_SIZE_IN_BYTES``.

**Note**: Only valid when the Iceberg connector is
configured with Hive.

``iceberg.hive.table-refresh.backoff-min-sleep-time`` The minimum amount of time to sleep between retries when 100ms
refreshing table metadata.

``iceberg.hive.table-refresh.backoff-max-sleep-time`` The maximum amount of time to sleep between retries when 5s
refreshing table metadata.

``iceberg.hive.table-refresh.max-retry-time`` The maximum amount of time to take across all retries before 1min
failing a table metadata refresh operation.

``iceberg.hive.table-refresh.retries`` The number of times to retry after errors when refreshing 20
table metadata using the Hive metastore.

``iceberg.hive.table-refresh.backoff-scale-factor`` The multiple used to scale subsequent wait time between 4.0
retries.
======================================================== ============================================================= ============

Nessie catalog
^^^^^^^^^^^^^^

Expand Down Expand Up @@ -194,35 +236,11 @@ To use a Hadoop catalog, configure the catalog type as
iceberg.catalog.type=hadoop
iceberg.catalog.warehouse=hdfs://hostname:port
Configuration Properties
------------------------

.. note::

The Iceberg connector supports configuration options for
`Amazon S3 <https://prestodb.io/docs/current/connector/hive.html##amazon-s3-configuration>`_
as a Hive connector.

The following configuration properties are available:
Hadoop catalog configuration properties:

======================================================= ============================================================= ============
Property Name Description Default
======================================================= ============================================================= ============
``hive.metastore.uri`` The URI(s) of the Hive metastore to connect to using the
Thrift protocol. If multiple URIs are provided, the first
URI is used by default, and the rest of the URIs are
fallback metastores.

Example: ``thrift://192.0.2.3:9083`` or
``thrift://192.0.2.3:9083,thrift://192.0.2.4:9083``.

This property is required if the
``iceberg.catalog.type`` is ``hive``. Otherwise, it will
be ignored.

``iceberg.catalog.type`` The catalog type for Iceberg tables. The available values ``hive``
are ``hive``, ``hadoop``, and ``nessie``.

``iceberg.catalog.warehouse`` The catalog warehouse root path for Iceberg tables.

Example: ``hdfs://nn:8020/warehouse/path``
Expand All @@ -232,6 +250,24 @@ Property Name Description
``iceberg.catalog.cached-catalog-num`` The number of Iceberg catalogs to cache. This property is ``10``
required if the ``iceberg.catalog.type`` is ``hadoop``.
Otherwise, it will be ignored.
======================================================= ============================================================= ============

Configuration Properties
------------------------

.. note::

The Iceberg connector supports configuration options for
`Amazon S3 <https://prestodb.io/docs/current/connector/hive.html##amazon-s3-configuration>`_
as a Hive connector.

The following configuration properties are available for all catalog types:

======================================================= ============================================================= ============
Property Name Description Default
======================================================= ============================================================= ============
``iceberg.catalog.type`` The catalog type for Iceberg tables. The available values ``HIVE``
are ``HIVE``, ``HADOOP``, and ``NESSIE`` and ``REST``.

``iceberg.hadoop.config.resources`` The path(s) for Hadoop configuration resources.

Expand Down Expand Up @@ -262,14 +298,6 @@ Property Name Description

``iceberg.enable-parquet-dereference-pushdown`` Enable parquet dereference pushdown. ``true``

``iceberg.hive-statistics-merge-strategy`` Comma separated list of statistics to use from the
Hive Metastore to override Iceberg table statistics.
The available values are ``NUMBER_OF_DISTINCT_VALUES``
and ``TOTAL_SIZE_IN_BYTES``.

**Note**: Only valid when the Iceberg connector is
configured with Hive.

``iceberg.statistic-snapshot-record-difference-weight`` The amount that the difference in total record count matters
when calculating the closest snapshot when picking
statistics. A value of 1 means a single record is equivalent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public class HiveTableOperations
private final Optional<String> owner;
private final Optional<String> location;
private final FileIO fileIO;
private final IcebergHiveTableOperationsConfig config;

private TableMetadata currentMetadata;
private String currentMetadataLocation;
Expand All @@ -114,12 +115,14 @@ public HiveTableOperations(
MetastoreContext metastoreContext,
HdfsEnvironment hdfsEnvironment,
HdfsContext hdfsContext,
IcebergHiveTableOperationsConfig config,
String database,
String table)
{
this(new HdfsFileIO(hdfsEnvironment, hdfsContext),
metastore,
metastoreContext,
config,
database,
table,
Optional.empty(),
Expand All @@ -131,6 +134,7 @@ public HiveTableOperations(
MetastoreContext metastoreContext,
HdfsEnvironment hdfsEnvironment,
HdfsContext hdfsContext,
IcebergHiveTableOperationsConfig config,
String database,
String table,
String owner,
Expand All @@ -139,6 +143,7 @@ public HiveTableOperations(
this(new HdfsFileIO(hdfsEnvironment, hdfsContext),
metastore,
metastoreContext,
config,
database,
table,
Optional.of(requireNonNull(owner, "owner is null")),
Expand All @@ -149,6 +154,7 @@ private HiveTableOperations(
FileIO fileIO,
ExtendedHiveMetastore metastore,
MetastoreContext metastoreContext,
IcebergHiveTableOperationsConfig config,
String database,
String table,
Optional<String> owner,
Expand All @@ -161,6 +167,7 @@ private HiveTableOperations(
this.tableName = requireNonNull(table, "table is null");
this.owner = requireNonNull(owner, "owner is null");
this.location = requireNonNull(location, "location is null");
this.config = requireNonNull(config, "config is null");
//TODO: duration from config
initTableLevelLockCache(TimeUnit.MINUTES.toMillis(10));
}
Expand Down Expand Up @@ -385,15 +392,23 @@ private void refreshFromMetadataLocation(String newLocation)
}

AtomicReference<TableMetadata> newMetadata = new AtomicReference<>();
Tasks.foreach(newLocation)
.retry(20)
.exponentialBackoff(100, 5000, 600000, 4.0)
.suppressFailureWhenFinished()
.run(metadataLocation -> newMetadata.set(
TableMetadataParser.read(fileIO, io().newInputFile(metadataLocation))));
try {
Tasks.foreach(newLocation)
.retry(config.getTableRefreshRetries())
.exponentialBackoff(
config.getTableRefreshBackoffMinSleepTime().toMillis(),
config.getTableRefreshBackoffMaxSleepTime().toMillis(),
config.getTableRefreshMaxRetryTime().toMillis(),
config.getTableRefreshBackoffScaleFactor())
.run(metadataLocation -> newMetadata.set(
TableMetadataParser.read(fileIO, io().newInputFile(metadataLocation))));
}
catch (RuntimeException e) {
throw new TableNotFoundException(getSchemaTableName(), "Table metadata is missing", e);
}

if (newMetadata.get() == null) {
throw new TableNotFoundException(getSchemaTableName(), "Table metadata is missing.");
throw new TableNotFoundException(getSchemaTableName(), "failed to retrieve table metadata from " + newLocation);
}

String newUUID = newMetadata.get().uuid();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ public class IcebergHiveMetadata
private final ExtendedHiveMetastore metastore;
private final HdfsEnvironment hdfsEnvironment;
private final DateTimeZone timeZone = DateTimeZone.forTimeZone(TimeZone.getTimeZone(ZoneId.of(TimeZone.getDefault().getID())));

private final FilterStatsCalculatorService filterStatsCalculatorService;
private final IcebergHiveTableOperationsConfig hiveTableOeprationsConfig;

public IcebergHiveMetadata(
ExtendedHiveMetastore metastore,
Expand All @@ -162,12 +162,14 @@ public IcebergHiveMetadata(
RowExpressionService rowExpressionService,
JsonCodec<CommitTaskData> commitTaskCodec,
NodeVersion nodeVersion,
FilterStatsCalculatorService filterStatsCalculatorService)
FilterStatsCalculatorService filterStatsCalculatorService,
IcebergHiveTableOperationsConfig hiveTableOeprationsConfig)
{
super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion);
this.metastore = requireNonNull(metastore, "metastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null");
this.hiveTableOeprationsConfig = requireNonNull(hiveTableOeprationsConfig, "hiveTableOperationsConfig is null");
}

public ExtendedHiveMetastore getMetastore()
Expand All @@ -178,7 +180,7 @@ public ExtendedHiveMetastore getMetastore()
@Override
protected org.apache.iceberg.Table getRawIcebergTable(ConnectorSession session, SchemaTableName schemaTableName)
{
return getHiveIcebergTable(metastore, hdfsEnvironment, session, schemaTableName);
return getHiveIcebergTable(metastore, hdfsEnvironment, hiveTableOeprationsConfig, session, schemaTableName);
}

@Override
Expand Down Expand Up @@ -296,6 +298,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
getMetastoreContext(session),
hdfsEnvironment,
hdfsContext,
hiveTableOeprationsConfig,
schemaName,
tableName,
session.getUser(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class IcebergHiveMetadataFactory
final RowExpressionService rowExpressionService;
final NodeVersion nodeVersion;
final FilterStatsCalculatorService filterStatsCalculatorService;
final IcebergHiveTableOperationsConfig operationsConfig;

@Inject
public IcebergHiveMetadataFactory(
Expand All @@ -48,7 +49,8 @@ public IcebergHiveMetadataFactory(
RowExpressionService rowExpressionService,
JsonCodec<CommitTaskData> commitTaskCodec,
NodeVersion nodeVersion,
FilterStatsCalculatorService filterStatsCalculatorService)
FilterStatsCalculatorService filterStatsCalculatorService,
IcebergHiveTableOperationsConfig operationsConfig)
{
this.metastore = requireNonNull(metastore, "metastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
Expand All @@ -58,10 +60,20 @@ public IcebergHiveMetadataFactory(
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null");
this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null");
this.operationsConfig = requireNonNull(operationsConfig, "operationsConfig is null");
}

public ConnectorMetadata create()
{
return new IcebergHiveMetadata(metastore, hdfsEnvironment, typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion, filterStatsCalculatorService);
return new IcebergHiveMetadata(
metastore,
hdfsEnvironment,
typeManager,
functionResolution,
rowExpressionService,
commitTaskCodec,
nodeVersion,
filterStatsCalculatorService,
operationsConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public void setup(Binder binder)
{
install(new HiveMetastoreModule(this.connectorId, this.metastore));
binder.bind(ExtendedHiveMetastore.class).to(InMemoryCachingHiveMetastore.class).in(Scopes.SINGLETON);
configBinder(binder).bindConfig(IcebergHiveTableOperationsConfig.class);

configBinder(binder).bindConfig(MetastoreClientConfig.class);
binder.bind(PartitionMutator.class).to(HivePartitionMutator.class).in(Scopes.SINGLETON);
Expand Down
Loading

0 comments on commit f3f060f

Please sign in to comment.