Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Iceberg]Refactor test classes to avoid them growing too large #23466

Merged
merged 1 commit into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.MaterializedRow;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestDistributedQueries;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -95,9 +95,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.facebook.presto.SystemSessionProperties.ITERATIVE_OPTIMIZER_TIMEOUT;
import static com.facebook.presto.SystemSessionProperties.LEGACY_TIMESTAMP;
import static com.facebook.presto.SystemSessionProperties.OPTIMIZER_USE_HISTOGRAMS;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.TimeZoneKey.UTC_KEY;
Expand All @@ -119,16 +117,14 @@
import static com.facebook.presto.tests.sql.TestTable.randomTableSuffix;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;
import static java.util.stream.IntStream.range;
import static org.apache.iceberg.SnapshotSummary.TOTAL_DATA_FILES_PROP;
import static org.apache.iceberg.SnapshotSummary.TOTAL_DELETE_FILES_PROP;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;

@Test(singleThreaded = true)
public abstract class IcebergDistributedTestBase
extends AbstractTestDistributedQueries
extends AbstractTestQueryFramework
{
private static final String METADATA_FILE_EXTENSION = ".metadata.json";
private final CatalogType catalogType;
Expand All @@ -152,34 +148,18 @@ protected QueryRunner createQueryRunner()
return IcebergQueryRunner.createIcebergQueryRunner(ImmutableMap.of(), catalogType, extraConnectorProperties);
}

@Override
protected boolean supportsNotNullColumns()
{
return false;
}

@Override
public void testRenameTable()
{
}

@Override
public void testRenameColumn()
{
}

@Override
public void testDelete()
@Test
public void testDeleteOnV1Table()
{
// Test delete all rows
long totalCount = (long) getQueryRunner().execute("CREATE TABLE test_delete as select * from lineitem")
long totalCount = (long) getQueryRunner().execute("CREATE TABLE test_delete with (format_version = '1') as select * from lineitem")
.getOnlyValue();
assertUpdate("DELETE FROM test_delete", totalCount);
assertEquals(getQueryRunner().execute("SELECT count(*) FROM test_delete").getOnlyValue(), 0L);
assertQuerySucceeds("DROP TABLE test_delete");

// Test delete whole partitions identified by one partition column
totalCount = (long) getQueryRunner().execute("CREATE TABLE test_partitioned_drop WITH (partitioning = ARRAY['bucket(orderkey, 2)', 'linenumber', 'linestatus']) as select * from lineitem")
totalCount = (long) getQueryRunner().execute("CREATE TABLE test_partitioned_drop WITH (format_version = '1', partitioning = ARRAY['bucket(orderkey, 2)', 'linenumber', 'linestatus']) as select * from lineitem")
.getOnlyValue();
long countPart1 = (long) getQueryRunner().execute("SELECT count(*) FROM test_partitioned_drop where linenumber = 1").getOnlyValue();
assertUpdate("DELETE FROM test_partitioned_drop WHERE linenumber = 1", countPart1);
Expand All @@ -193,7 +173,7 @@ public void testDelete()
assertQuerySucceeds("DROP TABLE test_partitioned_drop");

// Test delete whole partitions identified by two partition columns
totalCount = (long) getQueryRunner().execute("CREATE TABLE test_partitioned_drop WITH (partitioning = ARRAY['bucket(orderkey, 2)', 'linenumber', 'linestatus']) as select * from lineitem")
totalCount = (long) getQueryRunner().execute("CREATE TABLE test_partitioned_drop WITH (format_version = '1', partitioning = ARRAY['bucket(orderkey, 2)', 'linenumber', 'linestatus']) as select * from lineitem")
.getOnlyValue();
long countPart1F = (long) getQueryRunner().execute("SELECT count(*) FROM test_partitioned_drop where linenumber = 1 and linestatus = 'F'").getOnlyValue();
assertUpdate("DELETE FROM test_partitioned_drop WHERE linenumber = 1 and linestatus = 'F'", countPart1F);
Expand All @@ -209,14 +189,10 @@ public void testDelete()
assertEquals(totalCount - countPart1F - countPart2O - countPartOther, newTotalCount);
assertQuerySucceeds("DROP TABLE test_partitioned_drop");

// Support delete with filters on non-identity partition column
assertUpdate("CREATE TABLE test_partitioned_drop WITH (partitioning = ARRAY['bucket(orderkey, 2)', 'linenumber', 'linestatus']) as select * from lineitem", totalCount);
long countOrder1 = (long) getQueryRunner().execute("SELECT count(*) FROM test_partitioned_drop where orderkey = 1").getOnlyValue();
assertUpdate("DELETE FROM test_partitioned_drop WHERE orderkey = 1", countOrder1);
long countPartKey100 = (long) getQueryRunner().execute("SELECT count(*) FROM test_partitioned_drop where partkey > 100").getOnlyValue();
assertUpdate("DELETE FROM test_partitioned_drop WHERE partkey > 100", countPartKey100);
long countLine1Order1 = (long) getQueryRunner().execute("SELECT count(*) FROM test_partitioned_drop where linenumber = 1 and orderkey = 1").getOnlyValue();
assertUpdate("DELETE FROM test_partitioned_drop WHERE linenumber = 1 and orderkey = 1", countLine1Order1);
String errorMessage1 = "This connector only supports delete where one or more partitions are deleted entirely for table versions older than 2";
// Do not support delete with filters on non-identity partition column on v1 table
assertUpdate("CREATE TABLE test_partitioned_drop WITH (format_version = '1', partitioning = ARRAY['bucket(orderkey, 2)', 'linenumber', 'linestatus']) as select * from lineitem", totalCount);
assertQueryFails("DELETE FROM test_partitioned_drop WHERE orderkey = 1", errorMessage1);

// Do not allow delete data at specified snapshot
String errorMessage2 = "This connector do not allow delete data at specified snapshot";
Expand All @@ -229,12 +205,6 @@ public void testDelete()
assertQuerySucceeds("DROP TABLE test_partitioned_drop");
}

@Override
public void testUpdate()
{
// Updates are not supported by the connector
}

@Test
public void testDeleteWithPartitionSpecEvolution()
{
Expand Down Expand Up @@ -534,26 +504,6 @@ public void testShowColumnsForPartitionedTable()
assertEquals(actual, expectedParametrizedVarchar);
}

@Override
public void testShowColumns()
{
MaterializedResult actual = computeActual("SHOW COLUMNS FROM orders");

MaterializedResult expectedParametrizedVarchar = resultBuilder(getSession(), VARCHAR, VARCHAR, VARCHAR, VARCHAR)
.row("orderkey", "bigint", "", "")
.row("custkey", "bigint", "", "")
.row("orderstatus", "varchar", "", "")
.row("totalprice", "double", "", "")
.row("orderdate", "date", "", "")
.row("orderpriority", "varchar", "", "")
.row("clerk", "varchar", "", "")
.row("shippriority", "integer", "", "")
.row("comment", "varchar", "", "")
.build();

assertEquals(actual, expectedParametrizedVarchar);
}

@DataProvider(name = "timezones")
public Object[][] timezones()
{
Expand Down Expand Up @@ -686,86 +636,6 @@ public void testPartitionedByVarbinaryType()
assertQuerySucceeds("drop table test_partition_columns_varbinary");
}

@Override
public void testDescribeOutput()
{
}

@Override
public void testDescribeOutputNamedAndUnnamed()
{
}

/**
* Increased the optimizer timeout from 15000ms to 25000ms
*/
@Override
public void testLargeIn()
{
String longValues = range(0, 5000)
.mapToObj(Integer::toString)
.collect(joining(", "));
Session session = Session.builder(getSession())
.setSystemProperty(ITERATIVE_OPTIMIZER_TIMEOUT, "25000ms")
.build();
assertQuery(session, "SELECT orderkey FROM orders WHERE orderkey IN (" + longValues + ")");
assertQuery(session, "SELECT orderkey FROM orders WHERE orderkey NOT IN (" + longValues + ")");

assertQuery(session, "SELECT orderkey FROM orders WHERE orderkey IN (mod(1000, orderkey), " + longValues + ")");
assertQuery(session, "SELECT orderkey FROM orders WHERE orderkey NOT IN (mod(1000, orderkey), " + longValues + ")");

String varcharValues = range(0, 5000)
.mapToObj(i -> "'" + i + "'")
.collect(joining(", "));
assertQuery(session, "SELECT orderkey FROM orders WHERE cast(orderkey AS VARCHAR) IN (" + varcharValues + ")");
assertQuery(session, "SELECT orderkey FROM orders WHERE cast(orderkey AS VARCHAR) NOT IN (" + varcharValues + ")");

String arrayValues = range(0, 5000)
.mapToObj(i -> format("ARRAY[%s, %s, %s]", i, i + 1, i + 2))
.collect(joining(", "));
assertQuery(session, "SELECT ARRAY[0, 0, 0] in (ARRAY[0, 0, 0], " + arrayValues + ")", "values true");
assertQuery(session, "SELECT ARRAY[0, 0, 0] in (" + arrayValues + ")", "values false");
}

/**
* Increased the optimizer timeouts from 30000ms and 20000ms to 40000ms and 30000ms respectively
*/
@Override
public void testLargeInWithHistograms()
{
String longValues = range(0, 10_000)
.mapToObj(Integer::toString)
.collect(joining(", "));
String query = "select orderpriority, sum(totalprice) from lineitem join orders on lineitem.orderkey = orders.orderkey where orders.orderkey in (" + longValues + ") group by 1";
Session session = Session.builder(getSession())
.setSystemProperty(ITERATIVE_OPTIMIZER_TIMEOUT, "40000ms")
.setSystemProperty(OPTIMIZER_USE_HISTOGRAMS, "true")
.build();
assertQuerySucceeds(session, query);
session = Session.builder(getSession())
.setSystemProperty(ITERATIVE_OPTIMIZER_TIMEOUT, "30000ms")
.setSystemProperty(OPTIMIZER_USE_HISTOGRAMS, "false")
.build();
assertQuerySucceeds(session, query);
}

@Override
@Test
public void testStringFilters()
{
// Type not supported for Iceberg: CHAR(10). Only test VARCHAR(10).
assertUpdate("CREATE TABLE test_varcharn_filter (shipmode VARCHAR(10))");
assertTrue(getQueryRunner().tableExists(getSession(), "test_varcharn_filter"));
assertTableColumnNames("test_varcharn_filter", "shipmode");
assertUpdate("INSERT INTO test_varcharn_filter SELECT shipmode FROM lineitem", 60175);

assertQuery("SELECT count(*) FROM test_varcharn_filter WHERE shipmode = 'AIR'", "VALUES (8491)");
assertQuery("SELECT count(*) FROM test_varcharn_filter WHERE shipmode = 'AIR '", "VALUES (0)");
assertQuery("SELECT count(*) FROM test_varcharn_filter WHERE shipmode = 'AIR '", "VALUES (0)");
assertQuery("SELECT count(*) FROM test_varcharn_filter WHERE shipmode = 'AIR '", "VALUES (0)");
assertQuery("SELECT count(*) FROM test_varcharn_filter WHERE shipmode = 'NONEXIST'", "VALUES (0)");
}

@Test
public void testReadWriteStats()
{
Expand Down
Loading
Loading