From f712fe73e27d797a0a50c36121afa26cf3206906 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Fri, 28 Jan 2022 13:10:13 +0100 Subject: [PATCH] Test concurrent metadata access E.g. `SHOW TABLES` must not fail when tables, views or materialized views are concurrently created or dropped. --- .../iceberg/BaseIcebergConnectorTest.java | 8 + .../kudu/AbstractKuduConnectorTest.java | 7 + plugin/trino-phoenix/pom.xml | 6 + .../phoenix/TestPhoenixConnectorTest.java | 25 +++ plugin/trino-phoenix5/pom.xml | 6 + .../phoenix5/TestPhoenixConnectorTest.java | 25 +++ plugin/trino-sqlserver/pom.xml | 6 + .../sqlserver/BaseSqlServerConnectorTest.java | 31 +++ .../io/trino/testing/BaseConnectorTest.java | 189 ++++++++++++++++++ 9 files changed, 303 insertions(+) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 5373b3db41ac..2540612aceda 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -302,6 +302,14 @@ protected void checkInformationSchemaViewsForMaterializedView(String schemaName, .hasMessageFindingMatch("(?s)Expecting.*to contain:.*\\Q[(" + viewName + ")]"); } + @Test(enabled = false) // TODO fails + @Override + public void testReadMetadataWithRelationsConcurrentModifications() + throws Exception + { + super.testReadMetadataWithRelationsConcurrentModifications(); + } + @Test public void testDecimal() { diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/AbstractKuduConnectorTest.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/AbstractKuduConnectorTest.java index 50e140522864..95ef84459653 100644 --- a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/AbstractKuduConnectorTest.java +++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/AbstractKuduConnectorTest.java @@ -358,6 +358,13 @@ public void testWrittenStats() throw new SkipException("TODO"); } + @Override + public void testReadMetadataWithRelationsConcurrentModifications() + { + // TODO Support these test once kudu connector can create tables with default partitions + throw new SkipException("TODO"); + } + @Test @Override public void testCreateTableAsSelectNegativeDate() diff --git a/plugin/trino-phoenix/pom.xml b/plugin/trino-phoenix/pom.xml index ca0772c3e411..016913db8c43 100644 --- a/plugin/trino-phoenix/pom.xml +++ b/plugin/trino-phoenix/pom.xml @@ -190,6 +190,12 @@ + + io.trino + trino-testing-services + test + + io.trino trino-tpch diff --git a/plugin/trino-phoenix/src/test/java/io/trino/plugin/phoenix/TestPhoenixConnectorTest.java b/plugin/trino-phoenix/src/test/java/io/trino/plugin/phoenix/TestPhoenixConnectorTest.java index 8a58c68a844c..5dc2c7184dc0 100644 --- a/plugin/trino-phoenix/src/test/java/io/trino/plugin/phoenix/TestPhoenixConnectorTest.java +++ b/plugin/trino-phoenix/src/test/java/io/trino/plugin/phoenix/TestPhoenixConnectorTest.java @@ -22,6 +22,7 @@ import io.trino.testing.TestingConnectorBehavior; import io.trino.testing.sql.SqlExecutor; import io.trino.testing.sql.TestTable; +import io.trino.testng.services.Flaky; import org.testng.SkipException; import org.testng.annotations.AfterClass; import org.testng.annotations.Test; @@ -220,6 +221,30 @@ public void testShowCreateTable() ")"); } + // TODO (https://github.com/trinodb/trino/issues/10904): Test is flaky because tests execute in parallel + @Flaky(issue = "https://github.com/trinodb/trino/issues/10904", match = "\\QERROR 1012 (42M03): Table undefined. tableName=") + @Test + @Override + public void testSelectInformationSchemaColumns() + { + super.testSelectInformationSchemaColumns(); + } + + @Override + public void testReadMetadataWithRelationsConcurrentModifications() + { + try { + super.testReadMetadataWithRelationsConcurrentModifications(); + } + catch (Exception expected) { + // The test failure is not guaranteed + // TODO (https://github.com/trinodb/trino/issues/10904): shouldn't fail + assertThat(expected) + .hasMessageContaining("ERROR 1012 (42M03): Table undefined. tableName="); + throw new SkipException("to be fixed"); + } + } + @Override public void testCharVarcharComparison() { diff --git a/plugin/trino-phoenix5/pom.xml b/plugin/trino-phoenix5/pom.xml index 601a1362881a..125bab70e914 100644 --- a/plugin/trino-phoenix5/pom.xml +++ b/plugin/trino-phoenix5/pom.xml @@ -195,6 +195,12 @@ + + io.trino + trino-testing-services + test + + io.trino trino-tpch diff --git a/plugin/trino-phoenix5/src/test/java/io/trino/plugin/phoenix5/TestPhoenixConnectorTest.java b/plugin/trino-phoenix5/src/test/java/io/trino/plugin/phoenix5/TestPhoenixConnectorTest.java index 1107244bff7b..1840e546c448 100644 --- a/plugin/trino-phoenix5/src/test/java/io/trino/plugin/phoenix5/TestPhoenixConnectorTest.java +++ b/plugin/trino-phoenix5/src/test/java/io/trino/plugin/phoenix5/TestPhoenixConnectorTest.java @@ -24,6 +24,7 @@ import io.trino.testing.TestingConnectorBehavior; import io.trino.testing.sql.SqlExecutor; import io.trino.testing.sql.TestTable; +import io.trino.testng.services.Flaky; import org.testng.SkipException; import org.testng.annotations.AfterClass; import org.testng.annotations.Test; @@ -238,6 +239,30 @@ public void testShowCreateTable() ")"); } + // TODO (https://github.com/trinodb/trino/issues/10904): Test is flaky because tests execute in parallel + @Flaky(issue = "https://github.com/trinodb/trino/issues/10904", match = "\\QERROR 1012 (42M03): Table undefined. tableName=") + @Test + @Override + public void testSelectInformationSchemaColumns() + { + super.testSelectInformationSchemaColumns(); + } + + @Override + public void testReadMetadataWithRelationsConcurrentModifications() + { + try { + super.testReadMetadataWithRelationsConcurrentModifications(); + } + catch (Exception expected) { + // The test failure is not guaranteed + // TODO (https://github.com/trinodb/trino/issues/10904): shouldn't fail + assertThat(expected) + .hasMessageContaining("ERROR 1012 (42M03): Table undefined. tableName="); + throw new SkipException("to be fixed"); + } + } + @Override public void testCharVarcharComparison() { diff --git a/plugin/trino-sqlserver/pom.xml b/plugin/trino-sqlserver/pom.xml index e8e2fcc425c5..6413502a7f59 100644 --- a/plugin/trino-sqlserver/pom.xml +++ b/plugin/trino-sqlserver/pom.xml @@ -140,6 +140,12 @@ test + + io.trino + trino-testing-services + test + + io.trino trino-tpch diff --git a/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/BaseSqlServerConnectorTest.java b/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/BaseSqlServerConnectorTest.java index 04b7415f5b3c..0c565268a844 100644 --- a/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/BaseSqlServerConnectorTest.java +++ b/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/BaseSqlServerConnectorTest.java @@ -25,6 +25,8 @@ import io.trino.sql.planner.plan.FilterNode; import io.trino.testing.TestingConnectorBehavior; import io.trino.testing.sql.TestTable; +import io.trino.testng.services.Flaky; +import org.testng.SkipException; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -135,6 +137,35 @@ public void testReadFromView() onRemoteDatabase().execute("DROP VIEW IF EXISTS test_view"); } + // TODO (https://github.com/trinodb/trino/issues/10846): Test is expected to be flaky because tests execute in parallel + @Flaky(issue = "https://github.com/trinodb/trino/issues/10846", match = "was deadlocked on lock resources with another process and has been chosen as the deadlock victim") + @Test + @Override + public void testSelectInformationSchemaColumns() + { + super.testSelectInformationSchemaColumns(); + } + + @Test + @Override + public void testReadMetadataWithRelationsConcurrentModifications() + { + try { + super.testReadMetadataWithRelationsConcurrentModifications(); + } + catch (Exception expected) { + // The test failure is not guaranteed + // TODO (https://github.com/trinodb/trino/issues/10846): shouldn't fail + assertThat(expected) + .hasMessageMatching("(?s).*(" + + "No task completed before timeout|" + + "was deadlocked on lock resources with another process and has been chosen as the deadlock victim|" + + // E.g. system.metadata.table_comments can return empty results, when underlying metadata list tables call fails + "Expecting actual not to be empty).*"); + throw new SkipException("to be fixed"); + } + } + @Test public void testColumnComment() { diff --git a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java index 2198893302d6..eb7cce7b27c0 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java @@ -13,6 +13,7 @@ */ package io.trino.testing; +import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.UncheckedTimeoutException; @@ -34,11 +35,19 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; import java.util.List; import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.regex.Matcher; @@ -47,6 +56,7 @@ import java.util.stream.Stream; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verifyNotNull; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; @@ -105,6 +115,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; /** * Generic test for connectors. @@ -1199,6 +1210,184 @@ protected void checkInformationSchemaViewsForMaterializedView(String schemaName, .containsAll("VALUES '" + viewName + "'"); } + /** + * Test that reading table, column metadata, like {@code SHOW TABLES} or reading from {@code information_schema.views} + * does not fail when relations are concurrently created or dropped. + */ + @Test(timeOut = 180_000) + public void testReadMetadataWithRelationsConcurrentModifications() + throws Exception + { + if (!hasBehavior(SUPPORTS_CREATE_TABLE) && !hasBehavior(SUPPORTS_CREATE_VIEW) && !hasBehavior(SUPPORTS_CREATE_MATERIALIZED_VIEW)) { + throw new SkipException("Cannot test"); + } + + int readIterations = 5; + // generous timeout as this is a generic test; typically should be faster + int testTimeoutSeconds = 150; + + testReadMetadataWithRelationsConcurrentModifications(readIterations, testTimeoutSeconds); + } + + protected void testReadMetadataWithRelationsConcurrentModifications(int readIterations, int testTimeoutSeconds) + throws Exception + { + Stopwatch testWatch = Stopwatch.createStarted(); + + int readerTasksCount = 6 + + (hasBehavior(SUPPORTS_CREATE_VIEW) ? 1 : 0) + + (hasBehavior(SUPPORTS_CREATE_MATERIALIZED_VIEW) ? 1 : 0); + AtomicInteger incompleteReadTasks = new AtomicInteger(readerTasksCount); + List> readerTasks = new ArrayList<>(); + readerTasks.add(queryRepeatedly(readIterations, incompleteReadTasks, "SHOW TABLES")); + readerTasks.add(queryRepeatedly(readIterations, incompleteReadTasks, "SELECT * FROM information_schema.tables WHERE table_schema = CURRENT_SCHEMA")); + readerTasks.add(queryRepeatedly(readIterations, incompleteReadTasks, "SELECT * FROM information_schema.columns WHERE table_schema = CURRENT_SCHEMA")); + readerTasks.add(queryRepeatedly(readIterations, incompleteReadTasks, "SELECT * FROM system.jdbc.tables WHERE table_cat = CURRENT_CATALOG AND table_schem = CURRENT_SCHEMA")); + readerTasks.add(queryRepeatedly(readIterations, incompleteReadTasks, "SELECT * FROM system.jdbc.columns WHERE table_cat = CURRENT_CATALOG AND table_schem = CURRENT_SCHEMA")); + readerTasks.add(queryRepeatedly(readIterations, incompleteReadTasks, "SELECT * FROM system.metadata.table_comments WHERE catalog_name = CURRENT_CATALOG AND schema_name = CURRENT_SCHEMA")); + if (hasBehavior(SUPPORTS_CREATE_VIEW)) { + readerTasks.add(queryRepeatedly(readIterations, incompleteReadTasks, "SELECT * FROM information_schema.views WHERE table_schema = CURRENT_SCHEMA")); + } + if (hasBehavior(SUPPORTS_CREATE_MATERIALIZED_VIEW)) { + readerTasks.add(queryRepeatedly(readIterations, incompleteReadTasks, "SELECT * FROM system.metadata.materialized_views WHERE catalog_name = CURRENT_CATALOG AND schema_name = CURRENT_SCHEMA")); + } + assertEquals(readerTasks.size(), readerTasksCount); + + int writeTasksCount = 1 + + (hasBehavior(SUPPORTS_CREATE_VIEW) ? 1 : 0) + + (hasBehavior(SUPPORTS_CREATE_MATERIALIZED_VIEW) ? 1 : 0); + writeTasksCount = 2 * writeTasksCount; // writes are scheduled twice + CountDownLatch writeTasksInitialized = new CountDownLatch(writeTasksCount); + Runnable writeInitialized = writeTasksInitialized::countDown; + Supplier done = () -> incompleteReadTasks.get() == 0; + List> writeTasks = new ArrayList<>(); + writeTasks.add(createDropRepeatedly(writeInitialized, done, "concur_table", "CREATE TABLE %s(a integer)", "DROP TABLE %s")); + if (hasBehavior(SUPPORTS_CREATE_VIEW)) { + writeTasks.add(createDropRepeatedly(writeInitialized, done, "concur_view", "CREATE VIEW %s AS SELECT 1 a", "DROP VIEW %s")); + } + if (hasBehavior(SUPPORTS_CREATE_MATERIALIZED_VIEW)) { + writeTasks.add(createDropRepeatedly(writeInitialized, done, "concur_mview", "CREATE MATERIALIZED VIEW %s AS SELECT 1 a", "DROP MATERIALIZED VIEW %s")); + } + assertEquals(writeTasks.size() * 2, writeTasksCount); + + ExecutorService executor = newFixedThreadPool(readerTasksCount + writeTasksCount); + try { + CompletionService completionService = new ExecutorCompletionService<>(executor); + submitTasks(writeTasks, completionService); + submitTasks(writeTasks, completionService); // twice to increase chances of catching problems + if (!writeTasksInitialized.await(testTimeoutSeconds, SECONDS)) { + Future someFailure = completionService.poll(); + if (someFailure != null) { + someFailure.get(); // non-blocking + } + fail("Setup failed"); + } + submitTasks(readerTasks, completionService); + for (int i = 0; i < readerTasksCount + writeTasksCount; i++) { + long remainingTimeSeconds = testTimeoutSeconds - testWatch.elapsed(SECONDS); + Future future = completionService.poll(remainingTimeSeconds, SECONDS); + verifyNotNull(future, "Task did not completed before timeout; completed tasks: %s, current poll timeout: %s s", i, remainingTimeSeconds); + future.get(); // non-blocking + } + } + finally { + executor.shutdownNow(); + } + assertTrue(executor.awaitTermination(10, SECONDS)); + } + + /** + * Run {@code sql} query at least {@code minIterations} times and keep running until other tasks complete. + * {@code incompleteReadTasks} is used for orchestrating end of execution. + */ + protected Callable queryRepeatedly(int minIterations, AtomicInteger incompleteReadTasks, @Language("SQL") String sql) + { + return new Callable<>() + { + @Override + public Void call() + { + boolean alwaysEmpty = true; + for (int i = 0; i < minIterations; i++) { + MaterializedResult result = computeActual(sql); + alwaysEmpty &= result.getRowCount() == 0; + } + if (alwaysEmpty) { + fail(format("The results of [%s] are always empty after %s iterations, this may indicate test misconfiguration or broken connector behavior", sql, minIterations)); + } + assertThat(incompleteReadTasks.decrementAndGet()).as("incompleteReadTasks").isGreaterThanOrEqualTo(0); + // Keep running so that faster test queries have same length of exposure in wall time + while (incompleteReadTasks.get() != 0) { + computeActual(sql); + } + return null; + } + + @Override + public String toString() + { + return format("Query(%s)", sql); + } + }; + } + + protected Callable createDropRepeatedly(Runnable initReady, Supplier done, String namePrefix, String createTemplate, String dropTemplate) + { + return new Callable<>() + { + @Override + public Void call() + { + int objectsToKeep = 3; + Deque liveObjects = new ArrayDeque<>(objectsToKeep); + for (int i = 0; i < objectsToKeep; i++) { + String name = namePrefix + "_" + randomTableSuffix(); + assertUpdate(format(createTemplate, name)); + liveObjects.addLast(name); + } + initReady.run(); + while (!done.get()) { + assertUpdate(format(dropTemplate, liveObjects.removeFirst())); + String name = namePrefix + "_" + randomTableSuffix(); + assertUpdate(format(createTemplate, name)); + liveObjects.addLast(name); + } + while (!liveObjects.isEmpty()) { + assertUpdate(format(dropTemplate, liveObjects.removeFirst())); + } + return null; + } + + @Override + public String toString() + { + return format("Repeat (%s) and (%s)", createTemplate, dropTemplate); + } + }; + } + + protected void submitTasks(List> callables, CompletionService completionService) + { + for (Callable callable : callables) { + String taskDescription = callable.toString(); + completionService.submit(new Callable() + { + @Override + public T call() + throws Exception + { + try { + return callable.call(); + } + catch (Throwable e) { + e.addSuppressed(new Exception("Task: " + taskDescription)); + throw e; + } + } + }); + } + } + @Test public void testExplainAnalyze() {