Skip to content

Commit

Permalink
Add integration tests for fault tolerant execution
Browse files Browse the repository at this point in the history
  • Loading branch information
arhimondr authored and martint committed Jan 21, 2022
1 parent 790a24c commit e12e70e
Show file tree
Hide file tree
Showing 19 changed files with 559 additions and 18 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ jobs:
- ":trino-hive,:trino-orc"
- ":trino-hive,:trino-parquet -P test-parquet"
- ":trino-hive -P test-failure-recovery"
- ":trino-hive -P test-fault-tolerant-execution"
- ":trino-mongodb,:trino-kafka,:trino-elasticsearch"
- ":trino-elasticsearch -P test-stats"
- ":trino-redis"
Expand Down
26 changes: 22 additions & 4 deletions plugin/trino-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,8 @@
<excludes>
<exclude>**/TestHiveGlueMetastore.java</exclude>
<exclude>**/TestFullParquetReader.java</exclude>
<exclude>**/TestHiveFileFailureRecoveryTest.java</exclude>
<exclude>**/TestHiveMinioFailureRecoveryTest.java</exclude>
<exclude>**/TestHive*FailureRecoveryTest.java</exclude>
<exclude>**/TestHiveFaultTolerantExecution*.java</exclude>
</excludes>
</configuration>
</plugin>
Expand Down Expand Up @@ -462,9 +462,27 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<!-- Failure recovery tests spend most of the time waiting for a retry -->
<threadCount>4</threadCount>
<includes>
<include>**/TestHiveFileFailureRecoveryTest.java</include>
<include>**/TestHiveMinioFailureRecoveryTest.java</include>
<include>**/TestHive*FailureRecoveryTest.java</include>
</includes>
</configuration>
</plugin>
</plugins>
</build>
</profile>

<profile>
<id>test-fault-tolerant-execution</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<includes>
<include>**/TestHiveFaultTolerantExecution*.java</include>
</includes>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,24 +171,27 @@
import static org.testng.Assert.fail;
import static org.testng.FileAssert.assertFile;

public class TestHiveConnectorTest
public abstract class BaseHiveConnectorTest
extends BaseConnectorTest
{
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS");
private final String catalog;
private final Session bucketedSession;
private final Map<String, String> extraProperties;

public TestHiveConnectorTest()
protected BaseHiveConnectorTest(Map<String, String> extraProperties)
{
this.catalog = HIVE_CATALOG;
this.bucketedSession = createBucketedSession(Optional.of(new SelectedRole(ROLE, Optional.of("admin"))));
this.extraProperties = ImmutableMap.copyOf(requireNonNull(extraProperties, "extraProperties is null"));
}

@Override
protected QueryRunner createQueryRunner()
protected final QueryRunner createQueryRunner()
throws Exception
{
DistributedQueryRunner queryRunner = HiveQueryRunner.builder()
.setExtraProperties(extraProperties)
.setHiveProperties(ImmutableMap.of(
"hive.allow-register-partition-procedure", "true",
// Reduce writer sort buffer size to ensure SortingFileWriter gets used
Expand Down Expand Up @@ -1769,6 +1772,11 @@ public void testCreateTableWithUnsupportedType()

@Test
public void testTargetMaxFileSize()
{
testTargetMaxFileSize(3);
}

protected void testTargetMaxFileSize(int expectedTableWriters)
{
// We use TEXTFILE in this test because is has a very consistent and predictable size
@Language("SQL") String createTableSql = "CREATE TABLE test_max_file_size WITH (format = 'TEXTFILE') AS SELECT * FROM tpch.sf1.lineitem LIMIT 1000000";
Expand All @@ -1779,7 +1787,7 @@ public void testTargetMaxFileSize()
.setSystemProperty("task_writer_count", "1")
.build();
assertUpdate(session, createTableSql, 1000000);
assertThat(computeActual(selectFileInfo).getRowCount()).isEqualTo(3);
assertThat(computeActual(selectFileInfo).getRowCount()).isEqualTo(expectedTableWriters);
assertUpdate("DROP TABLE test_max_file_size");

// Write table with small limit and verify we get multiple files per node near the expected size
Expand All @@ -1792,7 +1800,7 @@ public void testTargetMaxFileSize()

assertUpdate(session, createTableSql, 1000000);
MaterializedResult result = computeActual(selectFileInfo);
assertThat(result.getRowCount()).isGreaterThan(3);
assertThat(result.getRowCount()).isGreaterThan(expectedTableWriters);
for (MaterializedRow row : result) {
// allow up to a larger delta due to the very small max size and the relatively large writer chunk size
assertThat((Long) row.getField(1)).isLessThan(maxSize.toBytes() * 3);
Expand All @@ -1803,6 +1811,11 @@ public void testTargetMaxFileSize()

@Test
public void testTargetMaxFileSizePartitioned()
{
testTargetMaxFileSizePartitioned(3);
}

protected void testTargetMaxFileSizePartitioned(int expectedTableWriters)
{
// We use TEXTFILE in this test because is has a very consistent and predictable size
@Language("SQL") String createTableSql = "" +
Expand All @@ -1816,7 +1829,7 @@ public void testTargetMaxFileSizePartitioned()
.setSystemProperty("task_writer_count", "1")
.build();
assertUpdate(session, createTableSql, 1000000);
assertThat(computeActual(selectFileInfo).getRowCount()).isEqualTo(9);
assertThat(computeActual(selectFileInfo).getRowCount()).isEqualTo(expectedTableWriters * 3);
assertUpdate("DROP TABLE test_max_file_size");

// Write table with small limit and verify we get multiple files per node near the expected size
Expand All @@ -1829,7 +1842,7 @@ public void testTargetMaxFileSizePartitioned()

assertUpdate(session, createTableSql, 1000000);
MaterializedResult result = computeActual(selectFileInfo);
assertThat(result.getRowCount()).isGreaterThan(9);
assertThat(result.getRowCount()).isGreaterThan(expectedTableWriters * 3);
for (MaterializedRow row : result) {
// allow up to a larger delta due to the very small max size and the relatively large writer chunk size
assertThat((Long) row.getField(1)).isLessThan(maxSize.toBytes() * 3);
Expand Down Expand Up @@ -3661,7 +3674,7 @@ public void testScaleWriters()
testWithAllStorageFormats(this::testMultipleWriters);
}

private void testSingleWriter(Session session, HiveStorageFormat storageFormat)
protected void testSingleWriter(Session session, HiveStorageFormat storageFormat)
{
try {
// small table that will only have one writer
Expand Down Expand Up @@ -8404,7 +8417,7 @@ private static class RollbackException
{
}

private void testWithAllStorageFormats(BiConsumer<Session, HiveStorageFormat> test)
protected void testWithAllStorageFormats(BiConsumer<Session, HiveStorageFormat> test)
{
for (TestingHiveStorageFormat storageFormat : getAllTestingHiveStorageFormat()) {
testWithStorageFormat(storageFormat, test);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.hive;

import io.trino.Session;
import io.trino.operator.RetryPolicy;
import io.trino.testing.BaseFailureRecoveryTest;
import org.intellij.lang.annotations.Language;
import org.testng.annotations.Test;
Expand All @@ -27,6 +28,11 @@
public abstract class BaseHiveFailureRecoveryTest
extends BaseFailureRecoveryTest
{
protected BaseHiveFailureRecoveryTest(RetryPolicy retryPolicy)
{
super(retryPolicy);
}

@Override
protected boolean areWriteRetriesSupported()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive;

import io.trino.testing.AbstractTestFaultTolerantExecutionAggregations;
import io.trino.testing.QueryRunner;

import java.util.Map;

import static io.trino.tpch.TpchTable.getTables;

public class TestHiveFaultTolerantExecutionAggregations
extends AbstractTestFaultTolerantExecutionAggregations
{
@Override
protected QueryRunner createQueryRunner(Map<String, String> extraProperties)
throws Exception
{
return HiveQueryRunner.builder()
.setExtraProperties(extraProperties)
.setInitialTables(getTables())
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive;

import io.trino.testing.FaultTolerantExecutionConnectorTestHelper;

import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class TestHiveFaultTolerantExecutionConnectorTest
extends BaseHiveConnectorTest
{
public TestHiveFaultTolerantExecutionConnectorTest()
{
super(FaultTolerantExecutionConnectorTestHelper.getExtraProperties());
}

@Override
public void testGroupedExecution()
{
// grouped execution is not supported (and not needed) with batch execution enabled
}

@Override
public void testScaleWriters()
{
testWithAllStorageFormats(this::testSingleWriter);
}

@Override
public void testTargetMaxFileSize()
{
testTargetMaxFileSize(1);
}

@Override
public void testTargetMaxFileSizePartitioned()
{
testTargetMaxFileSizePartitioned(1);
}

@Override
public void testOptimize()
{
assertThatThrownBy(super::testOptimize)
.hasMessageContaining("OPTIMIZE procedure is not supported with query retries enabled");
}

@Override
public void testOptimizeWithWriterScaling()
{
assertThatThrownBy(super::testOptimizeWithWriterScaling)
.hasMessageContaining("OPTIMIZE procedure is not supported with query retries enabled");
}

@Override
public void testOptimizeWithPartitioning()
{
assertThatThrownBy(super::testOptimizeWithPartitioning)
.hasMessageContaining("OPTIMIZE procedure is not supported with query retries enabled");
}

@Override
public void testOptimizeWithBucketing()
{
assertThatThrownBy(super::testOptimizeWithBucketing)
.hasMessageContaining("OPTIMIZE procedure is not supported with query retries enabled");
}

@Override
public void testOptimizeHiveInformationSchema()
{
assertThatThrownBy(super::testOptimizeHiveInformationSchema)
.hasMessageContaining("This connector does not support query retries");
}

@Override
public void testOptimizeHiveSystemTable()
{
assertThatThrownBy(super::testOptimizeHiveSystemTable)
.hasMessageContaining("This connector does not support query retries");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive;

import io.trino.testing.AbstractTestFaultTolerantExecutionJoinQueries;
import io.trino.testing.QueryRunner;
import org.testng.annotations.Test;

import java.util.Map;

import static io.trino.tpch.TpchTable.getTables;

public class TestHiveFaultTolerantExecutionJoinQueries
extends AbstractTestFaultTolerantExecutionJoinQueries
{
@Override
protected QueryRunner createQueryRunner(Map<String, String> extraProperties)
throws Exception
{
return HiveQueryRunner.builder()
.setExtraProperties(extraProperties)
.setInitialTables(getTables())
.build();
}

@Override
@Test(enabled = false)
public void testOutputDuplicatesInsensitiveJoin()
{
// flaky
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive;

import io.trino.testing.AbstractTestFaultTolerantExecutionOrderByQueries;
import io.trino.testing.QueryRunner;

import java.util.Map;

import static io.trino.tpch.TpchTable.getTables;

public class TestHiveFaultTolerantExecutionOrderByQueries
extends AbstractTestFaultTolerantExecutionOrderByQueries
{
@Override
protected QueryRunner createQueryRunner(Map<String, String> extraProperties)
throws Exception
{
return HiveQueryRunner.builder()
.setExtraProperties(extraProperties)
.setInitialTables(getTables())
.build();
}
}
Loading

0 comments on commit e12e70e

Please sign in to comment.