Skip to content

Commit

Permalink
Implement Iceberg OPTIMIZE
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi committed Jan 13, 2022
1 parent 85b6a05 commit f0c67f0
Show file tree
Hide file tree
Showing 16 changed files with 671 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.connector.TableProcedureMetadata;
import io.trino.spi.procedure.Procedure;
import io.trino.spi.session.PropertyMetadata;
import io.trino.spi.transaction.IsolationLevel;
Expand Down Expand Up @@ -62,6 +63,7 @@ public class IcebergConnector
private final List<PropertyMetadata<?>> tableProperties;
private final Optional<ConnectorAccessControl> accessControl;
private final Set<Procedure> procedures;
private final Set<TableProcedureMetadata> tableProcedures;

public IcebergConnector(
LifeCycleManager lifeCycleManager,
Expand All @@ -76,7 +78,8 @@ public IcebergConnector(
List<PropertyMetadata<?>> schemaProperties,
List<PropertyMetadata<?>> tableProperties,
Optional<ConnectorAccessControl> accessControl,
Set<Procedure> procedures)
Set<Procedure> procedures,
Set<TableProcedureMetadata> tableProcedures)
{
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
Expand All @@ -92,7 +95,8 @@ public IcebergConnector(
this.schemaProperties = ImmutableList.copyOf(requireNonNull(schemaProperties, "schemaProperties is null"));
this.tableProperties = ImmutableList.copyOf(requireNonNull(tableProperties, "tableProperties is null"));
this.accessControl = requireNonNull(accessControl, "accessControl is null");
this.procedures = requireNonNull(procedures, "procedures is null");
this.procedures = ImmutableSet.copyOf(requireNonNull(procedures, "procedures is null"));
this.tableProcedures = ImmutableSet.copyOf(requireNonNull(tableProcedures, "tableProcedures is null"));
}

@Override
Expand Down Expand Up @@ -150,6 +154,12 @@ public Set<Procedure> getProcedures()
return procedures;
}

@Override
public Set<TableProcedureMetadata> getTableProcedures()
{
return tableProcedures;
}

@Override
public List<PropertyMetadata<?>> getSessionProperties()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
package io.trino.plugin.iceberg;

import io.trino.plugin.hive.HiveTransactionHandle;
import io.trino.plugin.iceberg.procedure.IcebergTableExecuteHandle;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorHandleResolver;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorPartitioningHandle;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorTableExecuteHandle;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;

Expand Down Expand Up @@ -56,6 +58,12 @@ public Class<? extends ConnectorInsertTableHandle> getInsertTableHandleClass()
return IcebergWritableTableHandle.class;
}

@Override
public Class<? extends ConnectorTableExecuteHandle> getTableExecuteHandleClass()
{
return IcebergTableExecuteHandle.class;
}

@Override
public Class<? extends ConnectorTransactionHandle> getTransactionHandleClass()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,17 @@
import com.google.common.collect.Iterables;
import io.airlift.json.JsonCodec;
import io.airlift.slice.Slice;
import io.airlift.units.DataSize;
import io.trino.plugin.base.classloader.ClassLoaderSafeSystemTable;
import io.trino.plugin.hive.HiveApplyProjectionUtil;
import io.trino.plugin.hive.HiveApplyProjectionUtil.ProjectedColumnRepresentation;
import io.trino.plugin.hive.HiveWrittenPartitions;
import io.trino.plugin.iceberg.procedure.IcebergOptimizeHandle;
import io.trino.plugin.iceberg.procedure.IcebergTableExecuteHandle;
import io.trino.plugin.iceberg.procedure.IcebergTableProcedureId;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.Assignment;
import io.trino.spi.connector.BeginTableExecuteResult;
import io.trino.spi.connector.CatalogSchemaName;
import io.trino.spi.connector.CatalogSchemaTableName;
import io.trino.spi.connector.ColumnHandle;
Expand All @@ -39,6 +44,7 @@
import io.trino.spi.connector.ConnectorOutputMetadata;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableExecuteHandle;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableProperties;
Expand All @@ -63,11 +69,13 @@
import io.trino.spi.statistics.TableStatistics;
import io.trino.spi.type.TypeManager;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
Expand All @@ -83,6 +91,7 @@
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -122,6 +131,7 @@
import static io.trino.plugin.iceberg.TrinoHiveCatalog.DEPENDS_ON_TABLES;
import static io.trino.plugin.iceberg.TypeConverter.toIcebergType;
import static io.trino.plugin.iceberg.TypeConverter.toTrinoType;
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.OPTIMIZE;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.type.BigintType.BIGINT;
import static java.util.Collections.singletonList;
Expand Down Expand Up @@ -546,6 +556,158 @@ public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, Connect
return new IcebergColumnHandle(primitiveColumnIdentity(0, "$row_id"), BIGINT, ImmutableList.of(), BIGINT, Optional.empty());
}

@Override
public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
ConnectorSession session,
ConnectorTableHandle connectorTableHandle,
String procedureName,
Map<String, Object> executeProperties)
{
IcebergTableHandle tableHandle = (IcebergTableHandle) connectorTableHandle;

IcebergTableProcedureId procedureId;
try {
procedureId = IcebergTableProcedureId.valueOf(procedureName);
}
catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Unknown procedure '" + procedureName + "'");
}

switch (procedureId) {
case OPTIMIZE:
return getTableHandleForOptimize(session, tableHandle, executeProperties);
}

throw new IllegalArgumentException("Unknown procedure: " + procedureId);
}

private Optional<ConnectorTableExecuteHandle> getTableHandleForOptimize(ConnectorSession session, IcebergTableHandle tableHandle, Map<String, Object> executeProperties)
{
DataSize maxScannedFileSize = (DataSize) executeProperties.get("file_size_threshold");
Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName());

return Optional.of(new IcebergTableExecuteHandle(
tableHandle.getSchemaTableName(),
OPTIMIZE,
new IcebergOptimizeHandle(
SchemaParser.toJson(icebergTable.schema()),
PartitionSpecParser.toJson(icebergTable.spec()),
getColumns(icebergTable.schema(), typeManager),
getFileFormat(icebergTable),
icebergTable.properties(),
maxScannedFileSize),
icebergTable.location()));
}

@Override
public Optional<ConnectorNewTableLayout> getLayoutForTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle)
{
IcebergTableExecuteHandle executeHandle = (IcebergTableExecuteHandle) tableExecuteHandle;
switch (executeHandle.getProcedureId()) {
case OPTIMIZE:
return getLayoutForOptimize(session, executeHandle);
}
throw new IllegalArgumentException("Unknown procedure '" + executeHandle.getProcedureId() + "'");
}

private Optional<ConnectorNewTableLayout> getLayoutForOptimize(ConnectorSession session, IcebergTableExecuteHandle executeHandle)
{
Table icebergTable = catalog.loadTable(session, executeHandle.getSchemaTableName());
return getWriteLayout(icebergTable.schema(), icebergTable.spec());
}

@Override
public BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandle> beginTableExecute(
ConnectorSession session,
ConnectorTableExecuteHandle tableExecuteHandle,
ConnectorTableHandle updatedSourceTableHandle)
{
IcebergTableExecuteHandle executeHandle = (IcebergTableExecuteHandle) tableExecuteHandle;
IcebergTableHandle table = (IcebergTableHandle) updatedSourceTableHandle;
switch (executeHandle.getProcedureId()) {
case OPTIMIZE:
return beginOptimize(session, executeHandle, table);
}
throw new IllegalArgumentException("Unknown procedure '" + executeHandle.getProcedureId() + "'");
}

private BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandle> beginOptimize(
ConnectorSession session,
IcebergTableExecuteHandle executeHandle,
IcebergTableHandle table)
{
IcebergOptimizeHandle optimizeHandle = (IcebergOptimizeHandle) executeHandle.getProcedureHandle();
Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());

verify(transaction == null, "transaction already set");
transaction = icebergTable.newTransaction();

return new BeginTableExecuteResult<>(
executeHandle,
table.forOptimize(true, optimizeHandle.getMaxScannedFileSize()));
}

@Override
public void finishTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle, Collection<Slice> fragments, List<Object> splitSourceInfo)
{
IcebergTableExecuteHandle executeHandle = (IcebergTableExecuteHandle) tableExecuteHandle;
switch (executeHandle.getProcedureId()) {
case OPTIMIZE:
finishOptimize(executeHandle, fragments, splitSourceInfo);
return;
}
throw new IllegalArgumentException("Unknown procedure '" + executeHandle.getProcedureId() + "'");
}

private void finishOptimize(IcebergTableExecuteHandle executeHandle, Collection<Slice> fragments, List<Object> splitSourceInfo)
{
IcebergOptimizeHandle optimizeHandle = (IcebergOptimizeHandle) executeHandle.getProcedureHandle();
Table icebergTable = transaction.table();

// paths to be deleted
Set<DataFile> scannedFiles = splitSourceInfo.stream()
.map(DataFile.class::cast)
.collect(toImmutableSet());

List<CommitTaskData> commitTasks = fragments.stream()
.map(slice -> commitTaskCodec.fromJson(slice.getBytes()))
.collect(toImmutableList());

Type[] partitionColumnTypes = icebergTable.spec().fields().stream()
.map(field -> field.transform().getResultType(
icebergTable.schema().findType(field.sourceId())))
.toArray(Type[]::new);

Set<DataFile> newFiles = new HashSet<>();
for (CommitTaskData task : commitTasks) {
DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
.withPath(task.getPath())
.withFileSizeInBytes(task.getFileSizeInBytes())
.withFormat(optimizeHandle.getFileFormat())
.withMetrics(task.getMetrics().metrics());

if (!icebergTable.spec().fields().isEmpty()) {
String partitionDataJson = task.getPartitionDataJson()
.orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
}

newFiles.add(builder.build());
}

if (scannedFiles.isEmpty() && newFiles.isEmpty()) {
// Table scan turned out to be empty, nothing to commit
transaction = null;
return;
}

RewriteFiles rewriteFiles = transaction.newRewrite();
rewriteFiles.rewriteFiles(scannedFiles, newFiles);
rewriteFiles.commit();
transaction.commitTransaction();
transaction = null;
}

@Override
public Optional<Object> getInfo(ConnectorTableHandle tableHandle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import io.trino.plugin.hive.orc.OrcWriterConfig;
import io.trino.plugin.hive.parquet.ParquetReaderConfig;
import io.trino.plugin.hive.parquet.ParquetWriterConfig;
import io.trino.plugin.iceberg.procedure.OptimizeTableProcedure;
import io.trino.spi.connector.ConnectorNodePartitioningProvider;
import io.trino.spi.connector.ConnectorPageSinkProvider;
import io.trino.spi.connector.ConnectorPageSourceProvider;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.TableProcedureMetadata;
import io.trino.spi.procedure.Procedure;

import static com.google.inject.multibindings.Multibinder.newSetBinder;
Expand Down Expand Up @@ -76,5 +78,8 @@ public void configure(Binder binder)

Multibinder<Procedure> procedures = newSetBinder(binder, Procedure.class);
procedures.addBinding().toProvider(RollbackToSnapshotProcedure.class).in(Scopes.SINGLETON);

Multibinder<TableProcedureMetadata> tableProcedures = newSetBinder(binder, TableProcedureMetadata.class);
tableProcedures.addBinding().toProvider(OptimizeTableProcedure.class).in(Scopes.SINGLETON);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
import io.airlift.json.JsonCodec;
import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.plugin.hive.HdfsEnvironment.HdfsContext;
import io.trino.plugin.iceberg.procedure.IcebergOptimizeHandle;
import io.trino.plugin.iceberg.procedure.IcebergTableExecuteHandle;
import io.trino.spi.PageIndexerFactory;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorPageSink;
import io.trino.spi.connector.ConnectorPageSinkProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableExecuteHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.SchemaTableName;
import org.apache.iceberg.PartitionSpec;
Expand Down Expand Up @@ -93,4 +96,34 @@ private ConnectorPageSink createPageSink(ConnectorSession session, IcebergWritab
tableHandle.getFileFormat(),
maxOpenPartitions);
}

@Override
public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle)
{
IcebergTableExecuteHandle executeHandle = (IcebergTableExecuteHandle) tableExecuteHandle;
switch (executeHandle.getProcedureId()) {
case OPTIMIZE:
HdfsContext hdfsContext = new HdfsContext(session);
IcebergOptimizeHandle optimizeHandle = (IcebergOptimizeHandle) executeHandle.getProcedureHandle();
Schema schema = SchemaParser.fromJson(optimizeHandle.getSchemaAsJson());
PartitionSpec partitionSpec = PartitionSpecParser.fromJson(schema, optimizeHandle.getPartitionSpecAsJson());
LocationProvider locationProvider = getLocationProvider(executeHandle.getSchemaTableName(),
executeHandle.getTableLocation(), optimizeHandle.getTableStorageProperties());
return new IcebergPageSink(
schema,
partitionSpec,
locationProvider,
fileWriterFactory,
pageIndexerFactory,
hdfsEnvironment,
hdfsContext,
optimizeHandle.getTableColumns(),
jsonCodec,
session,
optimizeHandle.getFileFormat(),
maxOpenPartitions);
}

throw new IllegalArgumentException("Unknown procedure: " + executeHandle.getProcedureId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public ConnectorSplitSource getSplits(
IcebergTableHandle table = (IcebergTableHandle) handle;

if (table.getSnapshotId().isEmpty()) {
if (table.isRecordScannedFiles()) {
return new FixedSplitSource(ImmutableList.of(), ImmutableList.of());
}
return new FixedSplitSource(ImmutableList.of());
}

Expand All @@ -85,9 +88,11 @@ public ConnectorSplitSource getSplits(
table,
identityPartitionColumns,
tableScan,
table.getMaxScannedFileSize(),
dynamicFilter,
dynamicFilteringWaitTimeout,
constraint);
constraint,
table.isRecordScannedFiles());

return new ClassLoaderSafeConnectorSplitSource(splitSource, Thread.currentThread().getContextClassLoader());
}
Expand Down
Loading

0 comments on commit f0c67f0

Please sign in to comment.