Skip to content

Commit

Permalink
[Iceberg]Refactor test classes to avoid them growing too large
Browse files Browse the repository at this point in the history
  • Loading branch information
hantangwangd committed Aug 19, 2024
1 parent 56223eb commit ca7d028
Show file tree
Hide file tree
Showing 9 changed files with 413 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.MaterializedRow;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestDistributedQueries;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -95,9 +95,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.facebook.presto.SystemSessionProperties.ITERATIVE_OPTIMIZER_TIMEOUT;
import static com.facebook.presto.SystemSessionProperties.LEGACY_TIMESTAMP;
import static com.facebook.presto.SystemSessionProperties.OPTIMIZER_USE_HISTOGRAMS;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.TimeZoneKey.UTC_KEY;
Expand All @@ -119,16 +117,14 @@
import static com.facebook.presto.tests.sql.TestTable.randomTableSuffix;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;
import static java.util.stream.IntStream.range;
import static org.apache.iceberg.SnapshotSummary.TOTAL_DATA_FILES_PROP;
import static org.apache.iceberg.SnapshotSummary.TOTAL_DELETE_FILES_PROP;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;

@Test(singleThreaded = true)
public abstract class IcebergDistributedTestBase
extends AbstractTestDistributedQueries
extends AbstractTestQueryFramework
{
private static final String METADATA_FILE_EXTENSION = ".metadata.json";
private final CatalogType catalogType;
Expand All @@ -152,34 +148,18 @@ protected QueryRunner createQueryRunner()
return IcebergQueryRunner.createIcebergQueryRunner(ImmutableMap.of(), catalogType, extraConnectorProperties);
}

@Override
protected boolean supportsNotNullColumns()
{
return false;
}

@Override
public void testRenameTable()
{
}

@Override
public void testRenameColumn()
{
}

@Override
public void testDelete()
@Test
public void testDeleteOnV1Table()
{
// Test delete all rows
long totalCount = (long) getQueryRunner().execute("CREATE TABLE test_delete as select * from lineitem")
long totalCount = (long) getQueryRunner().execute("CREATE TABLE test_delete with (format_version = '1') as select * from lineitem")
.getOnlyValue();
assertUpdate("DELETE FROM test_delete", totalCount);
assertEquals(getQueryRunner().execute("SELECT count(*) FROM test_delete").getOnlyValue(), 0L);
assertQuerySucceeds("DROP TABLE test_delete");

// Test delete whole partitions identified by one partition column
totalCount = (long) getQueryRunner().execute("CREATE TABLE test_partitioned_drop WITH (partitioning = ARRAY['bucket(orderkey, 2)', 'linenumber', 'linestatus']) as select * from lineitem")
totalCount = (long) getQueryRunner().execute("CREATE TABLE test_partitioned_drop WITH (format_version = '1', partitioning = ARRAY['bucket(orderkey, 2)', 'linenumber', 'linestatus']) as select * from lineitem")
.getOnlyValue();
long countPart1 = (long) getQueryRunner().execute("SELECT count(*) FROM test_partitioned_drop where linenumber = 1").getOnlyValue();
assertUpdate("DELETE FROM test_partitioned_drop WHERE linenumber = 1", countPart1);
Expand All @@ -193,7 +173,7 @@ public void testDelete()
assertQuerySucceeds("DROP TABLE test_partitioned_drop");

// Test delete whole partitions identified by two partition columns
totalCount = (long) getQueryRunner().execute("CREATE TABLE test_partitioned_drop WITH (partitioning = ARRAY['bucket(orderkey, 2)', 'linenumber', 'linestatus']) as select * from lineitem")
totalCount = (long) getQueryRunner().execute("CREATE TABLE test_partitioned_drop WITH (format_version = '1', partitioning = ARRAY['bucket(orderkey, 2)', 'linenumber', 'linestatus']) as select * from lineitem")
.getOnlyValue();
long countPart1F = (long) getQueryRunner().execute("SELECT count(*) FROM test_partitioned_drop where linenumber = 1 and linestatus = 'F'").getOnlyValue();
assertUpdate("DELETE FROM test_partitioned_drop WHERE linenumber = 1 and linestatus = 'F'", countPart1F);
Expand All @@ -209,14 +189,10 @@ public void testDelete()
assertEquals(totalCount - countPart1F - countPart2O - countPartOther, newTotalCount);
assertQuerySucceeds("DROP TABLE test_partitioned_drop");

// Support delete with filters on non-identity partition column
assertUpdate("CREATE TABLE test_partitioned_drop WITH (partitioning = ARRAY['bucket(orderkey, 2)', 'linenumber', 'linestatus']) as select * from lineitem", totalCount);
long countOrder1 = (long) getQueryRunner().execute("SELECT count(*) FROM test_partitioned_drop where orderkey = 1").getOnlyValue();
assertUpdate("DELETE FROM test_partitioned_drop WHERE orderkey = 1", countOrder1);
long countPartKey100 = (long) getQueryRunner().execute("SELECT count(*) FROM test_partitioned_drop where partkey > 100").getOnlyValue();
assertUpdate("DELETE FROM test_partitioned_drop WHERE partkey > 100", countPartKey100);
long countLine1Order1 = (long) getQueryRunner().execute("SELECT count(*) FROM test_partitioned_drop where linenumber = 1 and orderkey = 1").getOnlyValue();
assertUpdate("DELETE FROM test_partitioned_drop WHERE linenumber = 1 and orderkey = 1", countLine1Order1);
String errorMessage1 = "This connector only supports delete where one or more partitions are deleted entirely for table versions older than 2";
// Do not support delete with filters on non-identity partition column on v1 table
assertUpdate("CREATE TABLE test_partitioned_drop WITH (format_version = '1', partitioning = ARRAY['bucket(orderkey, 2)', 'linenumber', 'linestatus']) as select * from lineitem", totalCount);
assertQueryFails("DELETE FROM test_partitioned_drop WHERE orderkey = 1", errorMessage1);

// Do not allow delete data at specified snapshot
String errorMessage2 = "This connector do not allow delete data at specified snapshot";
Expand All @@ -229,12 +205,6 @@ public void testDelete()
assertQuerySucceeds("DROP TABLE test_partitioned_drop");
}

@Override
public void testUpdate()
{
// Updates are not supported by the connector
}

@Test
public void testDeleteWithPartitionSpecEvolution()
{
Expand Down Expand Up @@ -534,26 +504,6 @@ public void testShowColumnsForPartitionedTable()
assertEquals(actual, expectedParametrizedVarchar);
}

@Override
public void testShowColumns()
{
MaterializedResult actual = computeActual("SHOW COLUMNS FROM orders");

MaterializedResult expectedParametrizedVarchar = resultBuilder(getSession(), VARCHAR, VARCHAR, VARCHAR, VARCHAR)
.row("orderkey", "bigint", "", "")
.row("custkey", "bigint", "", "")
.row("orderstatus", "varchar", "", "")
.row("totalprice", "double", "", "")
.row("orderdate", "date", "", "")
.row("orderpriority", "varchar", "", "")
.row("clerk", "varchar", "", "")
.row("shippriority", "integer", "", "")
.row("comment", "varchar", "", "")
.build();

assertEquals(actual, expectedParametrizedVarchar);
}

@DataProvider(name = "timezones")
public Object[][] timezones()
{
Expand Down Expand Up @@ -686,86 +636,6 @@ public void testPartitionedByVarbinaryType()
assertQuerySucceeds("drop table test_partition_columns_varbinary");
}

@Override
public void testDescribeOutput()
{
}

@Override
public void testDescribeOutputNamedAndUnnamed()
{
}

/**
* Increased the optimizer timeout from 15000ms to 25000ms
*/
@Override
public void testLargeIn()
{
String longValues = range(0, 5000)
.mapToObj(Integer::toString)
.collect(joining(", "));
Session session = Session.builder(getSession())
.setSystemProperty(ITERATIVE_OPTIMIZER_TIMEOUT, "25000ms")
.build();
assertQuery(session, "SELECT orderkey FROM orders WHERE orderkey IN (" + longValues + ")");
assertQuery(session, "SELECT orderkey FROM orders WHERE orderkey NOT IN (" + longValues + ")");

assertQuery(session, "SELECT orderkey FROM orders WHERE orderkey IN (mod(1000, orderkey), " + longValues + ")");
assertQuery(session, "SELECT orderkey FROM orders WHERE orderkey NOT IN (mod(1000, orderkey), " + longValues + ")");

String varcharValues = range(0, 5000)
.mapToObj(i -> "'" + i + "'")
.collect(joining(", "));
assertQuery(session, "SELECT orderkey FROM orders WHERE cast(orderkey AS VARCHAR) IN (" + varcharValues + ")");
assertQuery(session, "SELECT orderkey FROM orders WHERE cast(orderkey AS VARCHAR) NOT IN (" + varcharValues + ")");

String arrayValues = range(0, 5000)
.mapToObj(i -> format("ARRAY[%s, %s, %s]", i, i + 1, i + 2))
.collect(joining(", "));
assertQuery(session, "SELECT ARRAY[0, 0, 0] in (ARRAY[0, 0, 0], " + arrayValues + ")", "values true");
assertQuery(session, "SELECT ARRAY[0, 0, 0] in (" + arrayValues + ")", "values false");
}

/**
* Increased the optimizer timeouts from 30000ms and 20000ms to 40000ms and 30000ms respectively
*/
@Override
public void testLargeInWithHistograms()
{
String longValues = range(0, 10_000)
.mapToObj(Integer::toString)
.collect(joining(", "));
String query = "select orderpriority, sum(totalprice) from lineitem join orders on lineitem.orderkey = orders.orderkey where orders.orderkey in (" + longValues + ") group by 1";
Session session = Session.builder(getSession())
.setSystemProperty(ITERATIVE_OPTIMIZER_TIMEOUT, "40000ms")
.setSystemProperty(OPTIMIZER_USE_HISTOGRAMS, "true")
.build();
assertQuerySucceeds(session, query);
session = Session.builder(getSession())
.setSystemProperty(ITERATIVE_OPTIMIZER_TIMEOUT, "30000ms")
.setSystemProperty(OPTIMIZER_USE_HISTOGRAMS, "false")
.build();
assertQuerySucceeds(session, query);
}

@Override
@Test
public void testStringFilters()
{
// Type not supported for Iceberg: CHAR(10). Only test VARCHAR(10).
assertUpdate("CREATE TABLE test_varcharn_filter (shipmode VARCHAR(10))");
assertTrue(getQueryRunner().tableExists(getSession(), "test_varcharn_filter"));
assertTableColumnNames("test_varcharn_filter", "shipmode");
assertUpdate("INSERT INTO test_varcharn_filter SELECT shipmode FROM lineitem", 60175);

assertQuery("SELECT count(*) FROM test_varcharn_filter WHERE shipmode = 'AIR'", "VALUES (8491)");
assertQuery("SELECT count(*) FROM test_varcharn_filter WHERE shipmode = 'AIR '", "VALUES (0)");
assertQuery("SELECT count(*) FROM test_varcharn_filter WHERE shipmode = 'AIR '", "VALUES (0)");
assertQuery("SELECT count(*) FROM test_varcharn_filter WHERE shipmode = 'AIR '", "VALUES (0)");
assertQuery("SELECT count(*) FROM test_varcharn_filter WHERE shipmode = 'NONEXIST'", "VALUES (0)");
}

@Test
public void testReadWriteStats()
{
Expand Down
Loading

0 comments on commit ca7d028

Please sign in to comment.