Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable failure recovery for Iceberg connector #10622

Merged
merged 4 commits into from
Feb 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.iceberg;

import io.airlift.json.JsonCodec;
import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.spi.security.ConnectorIdentity;
import io.trino.spi.type.TypeManager;

Expand All @@ -26,20 +27,23 @@ public class IcebergMetadataFactory
private final TypeManager typeManager;
private final JsonCodec<CommitTaskData> commitTaskCodec;
private final TrinoCatalogFactory catalogFactory;
private final HdfsEnvironment hdfsEnvironment;

@Inject
public IcebergMetadataFactory(
TypeManager typeManager,
JsonCodec<CommitTaskData> commitTaskCodec,
TrinoCatalogFactory catalogFactory)
TrinoCatalogFactory catalogFactory,
HdfsEnvironment hdfsEnvironment)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
}

public IcebergMetadata create(ConnectorIdentity identity)
{
return new IcebergMetadata(typeManager, commitTaskCodec, catalogFactory.create(identity));
return new IcebergMetadata(typeManager, commitTaskCodec, catalogFactory.create(identity), hdfsEnvironment);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,9 @@ private void closeWriter(WriteContext writeContext)

private WriteContext createWriter(Optional<PartitionData> partitionData)
{
String fileName = fileFormat.toIceberg().addExtension(randomUUID().toString());
// prepend query id to a file name so we can determine which files were written by which query. This is needed for opportunistic cleanup of extra files
// which may be present for successfully completing query in presence of failure recovery mechanisms.
String fileName = fileFormat.toIceberg().addExtension(session.getQueryId() + "-" + randomUUID());
Path outputPath = partitionData.map(partition -> new Path(locationProvider.newDataLocation(partitionSpec, partition, fileName)))
.orElse(new Path(locationProvider.newDataLocation(fileName)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableList;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.RetryMode;

import java.util.List;
import java.util.Map;
Expand All @@ -35,6 +36,7 @@ public class IcebergWritableTableHandle
private final String outputPath;
private final IcebergFileFormat fileFormat;
private final Map<String, String> storageProperties;
private final RetryMode retryMode;

@JsonCreator
public IcebergWritableTableHandle(
Expand All @@ -45,7 +47,8 @@ public IcebergWritableTableHandle(
@JsonProperty("inputColumns") List<IcebergColumnHandle> inputColumns,
@JsonProperty("outputPath") String outputPath,
@JsonProperty("fileFormat") IcebergFileFormat fileFormat,
@JsonProperty("properties") Map<String, String> storageProperties)
@JsonProperty("properties") Map<String, String> storageProperties,
@JsonProperty("retryMode") RetryMode retryMode)
{
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.tableName = requireNonNull(tableName, "tableName is null");
Expand All @@ -55,6 +58,7 @@ public IcebergWritableTableHandle(
this.outputPath = requireNonNull(outputPath, "outputPath is null");
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
this.storageProperties = requireNonNull(storageProperties, "storageProperties is null");
this.retryMode = requireNonNull(retryMode, "retryMode is null");
}

@JsonProperty
Expand Down Expand Up @@ -105,6 +109,12 @@ public Map<String, String> getStorageProperties()
return storageProperties;
}

@JsonProperty
public RetryMode getRetryMode()
{
return retryMode;
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class IcebergOptimizeHandle
private final IcebergFileFormat fileFormat;
private final Map<String, String> tableStorageProperties;
private final DataSize maxScannedFileSize;
private final boolean retriesEnabled;

@JsonCreator
public IcebergOptimizeHandle(
Expand All @@ -43,14 +44,16 @@ public IcebergOptimizeHandle(
List<IcebergColumnHandle> tableColumns,
IcebergFileFormat fileFormat,
Map<String, String> tableStorageProperties,
DataSize maxScannedFileSize)
DataSize maxScannedFileSize,
boolean retriesEnabled)
{
this.schemaAsJson = requireNonNull(schemaAsJson, "schemaAsJson is null");
this.partitionSpecAsJson = requireNonNull(partitionSpecAsJson, "partitionSpecAsJson is null");
this.tableColumns = ImmutableList.copyOf(requireNonNull(tableColumns, "tableColumns is null"));
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
this.tableStorageProperties = ImmutableMap.copyOf(requireNonNull(tableStorageProperties, "tableStorageProperties is null"));
this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null");
this.retriesEnabled = retriesEnabled;
}

@JsonProperty
Expand Down Expand Up @@ -88,4 +91,10 @@ public DataSize getMaxScannedFileSize()
{
return maxScannedFileSize;
}

@JsonProperty
public boolean isRetriesEnabled()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could the RetryMode be provided by the engine to the finish*() methods, so that we don't have to embed the info in handles?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could - but I would opt for what we do now. It feels more natural - as we know information upfront. Also it allows for rejecting the request sooner if given retry mode is not supported by a connector.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allows for rejecting the request sooner if given retry mode is not supported by a connector.

i did not suggest not to provide it to begin methods.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be asymetric vs other "stuff" we pass to "begin*" methods (layout, list of columns). I will leave it as is.

{
return retriesEnabled;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import io.trino.operator.RetryPolicy;
import io.trino.testing.BaseFailureRecoveryTest;
import org.intellij.lang.annotations.Language;
import org.testng.annotations.Test;

import java.util.List;
import java.util.Optional;

import static java.lang.String.format;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;

public abstract class BaseIcebergFailureRecoveryTest
extends BaseFailureRecoveryTest
{
protected BaseIcebergFailureRecoveryTest(RetryPolicy retryPolicy)
{
super(retryPolicy);
}

@Override
protected boolean areWriteRetriesSupported()
{
return true;
}

@Override
public void testAnalyzeStatistics()
{
assertThatThrownBy(super::testAnalyzeStatistics)
.hasMessageContaining("This connector does not support analyze");
}

@Override
public void testDelete()
{
assertThatThrownBy(super::testDelete)
.hasMessageContaining("This connector only supports delete where one or more identity-transformed partitions are deleted entirely");
}

@Override
public void testDeleteWithSubquery()
{
assertThatThrownBy(super::testDelete)
.hasMessageContaining("This connector only supports delete where one or more identity-transformed partitions are deleted entirely");
}

@Override
protected void createPartitionedLineitemTable(String tableName, List<String> columns, String partitionColumn)
{
@Language("SQL") String sql = format(
"CREATE TABLE %s WITH (partitioning=array['%s']) AS SELECT %s FROM tpch.tiny.lineitem",
tableName,
partitionColumn,
String.join(",", columns));
getQueryRunner().execute(sql);
}

@Override
public void testUpdate()
{
assertThatThrownBy(super::testUpdate)
.hasMessageContaining("This connector does not support updates");
}

@Override
public void testUpdateWithSubquery()
{
assertThatThrownBy(super::testUpdateWithSubquery)
.hasMessageContaining("This connector does not support updates");
}

@Test(invocationCount = INVOCATION_COUNT)
public void testCreatePartitionedTable()
{
testTableModification(
Optional.empty(),
"CREATE TABLE <table> WITH (partitioning = ARRAY['p']) AS SELECT *, 'partition1' p FROM orders",
Optional.of("DROP TABLE <table>"));
}

@Test(invocationCount = INVOCATION_COUNT)
public void testInsertIntoNewPartition()
{
testTableModification(
Optional.of("CREATE TABLE <table> WITH (partitioning = ARRAY['p']) AS SELECT *, 'partition1' p FROM orders"),
"INSERT INTO <table> SELECT *, 'partition2' p FROM orders",
Optional.of("DROP TABLE <table>"));
}

@Test(invocationCount = INVOCATION_COUNT)
public void testInsertIntoExistingPartition()
{
testTableModification(
Optional.of("CREATE TABLE <table> WITH (partitioning = ARRAY['p']) AS SELECT *, 'partition1' p FROM orders"),
"INSERT INTO <table> SELECT *, 'partition1' p FROM orders",
Optional.of("DROP TABLE <table>"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
import io.airlift.log.Logging;
import io.trino.Session;
import io.trino.plugin.tpch.TpchPlugin;
import io.trino.testing.DistributedQueryRunner;
import io.trino.tpch.TpchTable;
Expand All @@ -28,9 +28,11 @@
import java.util.Map;
import java.util.Optional;

import static io.airlift.testing.Closeables.closeAllSuppress;
import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME;
import static io.trino.testing.QueryAssertions.copyTpchTables;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static java.util.Objects.requireNonNull;

public final class IcebergQueryRunner
{
Expand Down Expand Up @@ -69,31 +71,90 @@ public static DistributedQueryRunner createIcebergQueryRunner(
Optional<File> metastoreDirectory)
throws Exception
{
Session session = testSessionBuilder()
.setCatalog(ICEBERG_CATALOG)
.setSchema("tpch")
.build();

DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session)
Builder builder = builder()
.setExtraProperties(extraProperties)
.build();
.setIcebergProperties(connectorProperties)
.setInitialTables(tables);

metastoreDirectory.ifPresent(builder::setMetastoreDirectory);
return builder.build();
}

queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");
public static Builder builder()
{
return new Builder();
}

Path dataDir = metastoreDirectory.map(File::toPath).orElseGet(() -> queryRunner.getCoordinator().getBaseDataDir().resolve("iceberg_data"));
public static class Builder
extends DistributedQueryRunner.Builder<Builder>
{
private Optional<File> metastoreDirectory = Optional.empty();
private ImmutableMap.Builder<String, String> icebergProperties = ImmutableMap.builder();
private List<TpchTable<?>> initialTables = ImmutableList.of();

protected Builder()
{
super(testSessionBuilder()
.setCatalog(ICEBERG_CATALOG)
.setSchema("tpch")
.build());
}

queryRunner.installPlugin(new IcebergPlugin());
connectorProperties = new HashMap<>(ImmutableMap.copyOf(connectorProperties));
connectorProperties.putIfAbsent("iceberg.catalog.type", "TESTING_FILE_METASTORE");
connectorProperties.putIfAbsent("hive.metastore.catalog.dir", dataDir.toString());
queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", connectorProperties);
public Builder setMetastoreDirectory(File metastoreDirectory)
{
this.metastoreDirectory = Optional.of(metastoreDirectory);
return self();
}

queryRunner.execute("CREATE SCHEMA tpch");
public Builder setIcebergProperties(Map<String, String> icebergProperties)
{
this.icebergProperties = ImmutableMap.<String, String>builder()
.putAll(requireNonNull(icebergProperties, "icebergProperties is null"));
return self();
}

copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, session, tables);
public Builder addIcebergProperty(String key, String value)
{
this.icebergProperties.put(key, value);
return self();
}

return queryRunner;
public Builder setInitialTables(Iterable<TpchTable<?>> initialTables)
{
this.initialTables = ImmutableList.copyOf(requireNonNull(initialTables, "initialTables is null"));
return self();
}

@Override
public DistributedQueryRunner build()
throws Exception
{
DistributedQueryRunner queryRunner = super.build();
try {
queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");

Path dataDir = metastoreDirectory.map(File::toPath).orElseGet(() -> queryRunner.getCoordinator().getBaseDataDir().resolve("iceberg_data"));

queryRunner.installPlugin(new IcebergPlugin());
Map<String, String> icebergProperties = new HashMap<>();
icebergProperties.put("iceberg.catalog.type", "TESTING_FILE_METASTORE");
icebergProperties.put("hive.metastore.catalog.dir", dataDir.toString());
icebergProperties.putAll(this.icebergProperties.build());

queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", icebergProperties);

queryRunner.execute("CREATE SCHEMA tpch");

copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, queryRunner.getDefaultSession(), initialTables);

return queryRunner;
}
catch (Exception e) {
closeAllSuppress(e, queryRunner);
throw e;
}
}
}

public static void main(String[] args)
Expand Down
Loading