Skip to content

Commit

Permalink
Add iceberg MinIO connector smoke test
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkadiusz Czajkowski committed Mar 2, 2022
1 parent a60e7a9 commit b46e0d6
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public HiveMinioDataLake(String bucketName, Map<String, String> hiveHadoopFilesT
this.hiveHadoop = closer.register(
HiveHadoop.builder()
.withFilesToMount(ImmutableMap.<String, String>builder()
.put("hive_s3_insert_overwrite/hive-core-site.xml", "/etc/hadoop/conf/core-site.xml")
.put("hive_minio_datalake/hive-core-site.xml", "/etc/hadoop/conf/core-site.xml")
.putAll(hiveHadoopFilesToMount)
.buildOrThrow())
.withImage(hiveHadoopImage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,8 @@
<name>fs.s3a.path.style.access</name>
<value>true</value>
</property>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,24 @@

import io.trino.testing.BaseConnectorSmokeTest;
import io.trino.testing.TestingConnectorBehavior;
import org.apache.iceberg.FileFormat;
import org.testng.annotations.Test;

import java.io.File;

import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public abstract class BaseIcebergConnectorSmokeTest
extends BaseConnectorSmokeTest
{
protected final FileFormat format;

public BaseIcebergConnectorSmokeTest(FileFormat format)
{
this.format = requireNonNull(format, "format is null");
}

@Override
protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
{
Expand Down Expand Up @@ -59,17 +66,17 @@ public void testRowLevelDelete()
@Override
public void testShowCreateTable()
{
File tempDir = getDistributedQueryRunner().getCoordinator().getBaseDataDir().toFile();
String schemaName = getSession().getSchema().orElseThrow();
assertThat((String) computeScalar("SHOW CREATE TABLE region"))
.isEqualTo("" +
"CREATE TABLE iceberg.tpch.region (\n" +
.matches("" +
"CREATE TABLE iceberg." + schemaName + ".region \\(\n" +
" regionkey bigint,\n" +
" name varchar,\n" +
" comment varchar\n" +
")\n" +
"WITH (\n" +
" format = 'ORC',\n" +
format(" location = '%s/iceberg_data/tpch/region'\n", tempDir) +
")");
"\\)\n" +
"WITH \\(\n" +
" format = '" + format.name() + "',\n" +
format(" location = '.*/" + schemaName + "/region'\n") +
"\\)");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableMap;
import io.trino.plugin.hive.containers.HiveMinioDataLake;
import io.trino.testing.QueryRunner;
import org.apache.iceberg.FileFormat;
import org.testng.annotations.Test;

import java.util.Locale;
import java.util.Optional;

import static io.trino.plugin.hive.containers.HiveMinioDataLake.ACCESS_KEY;
import static io.trino.plugin.hive.containers.HiveMinioDataLake.SECRET_KEY;
import static io.trino.plugin.iceberg.IcebergQueryRunner.createIcebergQueryRunner;
import static io.trino.testing.sql.TestTable.randomTableSuffix;
import static java.lang.String.format;

public abstract class BaseIcebergMinioConnectorSmokeTest
extends BaseIcebergConnectorSmokeTest
{
private final String schemaName;
private final String bucketName;

private HiveMinioDataLake hiveMinioDataLake;

public BaseIcebergMinioConnectorSmokeTest(FileFormat format)
{
super(format);
this.schemaName = "tpch_" + format.name().toLowerCase(Locale.ENGLISH);
this.bucketName = "test-iceberg-minio-smoke-test-" + randomTableSuffix();
}

@Override
protected QueryRunner createQueryRunner()
throws Exception
{
this.hiveMinioDataLake = closeAfterClass(new HiveMinioDataLake(bucketName, ImmutableMap.of()));
this.hiveMinioDataLake.start();
return createIcebergQueryRunner(
ImmutableMap.of(),
ImmutableMap.<String, String>builder()
.put("iceberg.file-format", format.name())
.put("iceberg.catalog.type", "HIVE_METASTORE")
.put("hive.metastore.uri", "thrift://" + hiveMinioDataLake.getHiveHadoop().getHiveMetastoreEndpoint())
.put("hive.s3.aws-access-key", ACCESS_KEY)
.put("hive.s3.aws-secret-key", SECRET_KEY)
.put("hive.s3.endpoint", "http://" + hiveMinioDataLake.getMinio().getMinioApiEndpoint())
.put("hive.s3.path-style-access", "true")
.put("hive.s3.streaming.part-size", "5MB")
.buildOrThrow(),
SchemaInitializer.builder()
.withSchemaName(schemaName)
.withClonedTpchTables(REQUIRED_TPCH_TABLES)
.withSchemaProperties(ImmutableMap.of(
"location", "'s3://" + bucketName + "/" + schemaName + "'"))
.build(),
Optional.empty());
}

@Override
protected String createSchemaSql(String schemaName)
{
return "CREATE SCHEMA IF NOT EXISTS " + schemaName + " WITH (location = 's3://" + bucketName + "/" + schemaName + "')";
}

@Test
@Override
public void testRenameSchema()
{
assertQueryFails(
format("ALTER SCHEMA %s RENAME TO %s", schemaName, schemaName + randomTableSuffix()),
"Hive metastore does not support renaming schemas");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,18 @@
import io.trino.testing.QueryRunner;

import static io.trino.plugin.iceberg.IcebergQueryRunner.createIcebergQueryRunner;
import static org.apache.iceberg.FileFormat.ORC;

// Redundant over TestIcebergOrcConnectorTest, but exists to exercise BaseConnectorSmokeTest
// Some features like materialized views may be supported by Iceberg only.
public class TestIcebergConnectorSmokeTest
extends BaseIcebergConnectorSmokeTest
{
public TestIcebergConnectorSmokeTest()
{
super(ORC);
}

@Override
protected QueryRunner createQueryRunner()
throws Exception
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import static org.apache.iceberg.FileFormat.ORC;

public class TestIcebergMinioOrcConnectorSmokeTest
extends BaseIcebergMinioConnectorSmokeTest
{
public TestIcebergMinioOrcConnectorSmokeTest()
{
super(ORC);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import static org.apache.iceberg.FileFormat.PARQUET;

public class TestIcebergMinioParquetConnectorSmokeTest
extends BaseIcebergMinioConnectorSmokeTest
{
public TestIcebergMinioParquetConnectorSmokeTest()
{
super(PARQUET);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
return connectorBehavior.hasBehaviorByDefault(this::hasBehavior);
}

protected String createSchemaSql(String schemaName)
{
return "CREATE SCHEMA " + schemaName;
}

/**
* Ensure the tests are run with {@link DistributedQueryRunner}. E.g. {@link LocalQueryRunner} takes some
* shortcuts, not exercising certain aspects.
Expand Down Expand Up @@ -245,11 +250,11 @@ public void testCreateSchema()
{
String schemaName = "test_schema_create_" + randomTableSuffix();
if (!hasBehavior(SUPPORTS_CREATE_SCHEMA)) {
assertQueryFails("CREATE SCHEMA " + schemaName, "This connector does not support creating schemas");
assertQueryFails(createSchemaSql(schemaName), "This connector does not support creating schemas");
return;
}

assertUpdate("CREATE SCHEMA " + schemaName);
assertUpdate(createSchemaSql(schemaName));
assertThat(query("SHOW SCHEMAS"))
.skippingTypesCheck()
.containsAll(format("VALUES '%s', '%s'", getSession().getSchema().orElseThrow(), schemaName));
Expand Down Expand Up @@ -339,7 +344,7 @@ public void testRenameTableAcrossSchemas()
assertUpdate("CREATE TABLE " + oldTable + " (a bigint, b double)");

String schemaName = "test_schema_" + randomTableSuffix();
assertUpdate("CREATE SCHEMA " + schemaName);
assertUpdate(createSchemaSql(schemaName));

String newTable = schemaName + ".test_rename_new_" + randomTableSuffix();
assertUpdate("ALTER TABLE " + oldTable + " RENAME TO " + newTable);
Expand Down

0 comments on commit b46e0d6

Please sign in to comment.