From 93a01784cc8a2b54705215f477dedf771043dbff Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 7 Jun 2023 17:01:08 +0200 Subject: [PATCH 1/4] Flink: Ignore the Forbidden when creating a database It can be that the user that runs a Flink job, doesn't have the privileges to create a database. In that case we just assume that it already exists. --- .../src/main/java/org/apache/iceberg/flink/FlinkCatalog.java | 4 ++++ .../src/main/java/org/apache/iceberg/flink/FlinkCatalog.java | 4 ++++ .../src/main/java/org/apache/iceberg/flink/FlinkCatalog.java | 4 ++++ 3 files changed, 12 insertions(+) diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 3ecddaa05c9b..1c22cf7b71e6 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -67,6 +67,7 @@ import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; @@ -130,6 +131,9 @@ public void open() throws CatalogException { createDatabase(getDefaultDatabase(), ImmutableMap.of(), true); } catch (DatabaseAlreadyExistException e) { // Ignore the exception if it's already exist. + } catch (ForbiddenException e) { + // Ignore if the user doesn't have CREATE DATABASE permissions, + // and we assume that the database already exists } } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 3ecddaa05c9b..1c22cf7b71e6 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -67,6 +67,7 @@ import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; @@ -130,6 +131,9 @@ public void open() throws CatalogException { createDatabase(getDefaultDatabase(), ImmutableMap.of(), true); } catch (DatabaseAlreadyExistException e) { // Ignore the exception if it's already exist. + } catch (ForbiddenException e) { + // Ignore if the user doesn't have CREATE DATABASE permissions, + // and we assume that the database already exists } } diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 3ecddaa05c9b..1c22cf7b71e6 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -67,6 +67,7 @@ import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; @@ -130,6 +131,9 @@ public void open() throws CatalogException { createDatabase(getDefaultDatabase(), ImmutableMap.of(), true); } catch (DatabaseAlreadyExistException e) { // Ignore the exception if it's already exist. + } catch (ForbiddenException e) { + // Ignore if the user doesn't have CREATE DATABASE permissions, + // and we assume that the database already exists } } From 4269e502600ebbfa98aa589b1102eb6eaa7ac69b Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Thu, 15 Jun 2023 22:41:28 +0200 Subject: [PATCH 2/4] Check for the database instead --- .../org/apache/iceberg/flink/FlinkCatalog.java | 15 +++++++-------- .../org/apache/iceberg/flink/FlinkCatalog.java | 15 +++++++-------- .../org/apache/iceberg/flink/FlinkCatalog.java | 15 +++++++-------- 3 files changed, 21 insertions(+), 24 deletions(-) diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 1c22cf7b71e6..16331f5ab1ea 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -67,7 +67,6 @@ import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; -import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; @@ -127,13 +126,13 @@ public FlinkCatalog( @Override public void open() throws CatalogException { // Create the default database if it does not exist. - try { - createDatabase(getDefaultDatabase(), ImmutableMap.of(), true); - } catch (DatabaseAlreadyExistException e) { - // Ignore the exception if it's already exist. - } catch (ForbiddenException e) { - // Ignore if the user doesn't have CREATE DATABASE permissions, - // and we assume that the database already exists + String defaultDatabase = getDefaultDatabase(); + if (!databaseExists(defaultDatabase)) { + try { + createDatabase(getDefaultDatabase(), ImmutableMap.of(), true); + } catch (DatabaseAlreadyExistException e) { + // Ignore the exception if it's already exist. + } } } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 1c22cf7b71e6..16331f5ab1ea 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -67,7 +67,6 @@ import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; -import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; @@ -127,13 +126,13 @@ public FlinkCatalog( @Override public void open() throws CatalogException { // Create the default database if it does not exist. - try { - createDatabase(getDefaultDatabase(), ImmutableMap.of(), true); - } catch (DatabaseAlreadyExistException e) { - // Ignore the exception if it's already exist. - } catch (ForbiddenException e) { - // Ignore if the user doesn't have CREATE DATABASE permissions, - // and we assume that the database already exists + String defaultDatabase = getDefaultDatabase(); + if (!databaseExists(defaultDatabase)) { + try { + createDatabase(getDefaultDatabase(), ImmutableMap.of(), true); + } catch (DatabaseAlreadyExistException e) { + // Ignore the exception if it's already exist. + } } } diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 1c22cf7b71e6..16331f5ab1ea 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -67,7 +67,6 @@ import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; -import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; @@ -127,13 +126,13 @@ public FlinkCatalog( @Override public void open() throws CatalogException { // Create the default database if it does not exist. - try { - createDatabase(getDefaultDatabase(), ImmutableMap.of(), true); - } catch (DatabaseAlreadyExistException e) { - // Ignore the exception if it's already exist. - } catch (ForbiddenException e) { - // Ignore if the user doesn't have CREATE DATABASE permissions, - // and we assume that the database already exists + String defaultDatabase = getDefaultDatabase(); + if (!databaseExists(defaultDatabase)) { + try { + createDatabase(getDefaultDatabase(), ImmutableMap.of(), true); + } catch (DatabaseAlreadyExistException e) { + // Ignore the exception if it's already exist. + } } } From 26301d0bd8f4fa82a22969353dee1efc7fc2c9a4 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Sat, 24 Jun 2023 08:29:01 +0200 Subject: [PATCH 3/4] Remove the create database --- .../java/org/apache/iceberg/flink/FlinkCatalog.java | 12 +----------- .../java/org/apache/iceberg/flink/FlinkCatalog.java | 12 +----------- .../java/org/apache/iceberg/flink/FlinkCatalog.java | 12 +----------- 3 files changed, 3 insertions(+), 33 deletions(-) diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 16331f5ab1ea..825816fdf416 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -124,17 +124,7 @@ public FlinkCatalog( } @Override - public void open() throws CatalogException { - // Create the default database if it does not exist. - String defaultDatabase = getDefaultDatabase(); - if (!databaseExists(defaultDatabase)) { - try { - createDatabase(getDefaultDatabase(), ImmutableMap.of(), true); - } catch (DatabaseAlreadyExistException e) { - // Ignore the exception if it's already exist. - } - } - } + public void open() throws CatalogException {} @Override public void close() throws CatalogException { diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 16331f5ab1ea..825816fdf416 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -124,17 +124,7 @@ public FlinkCatalog( } @Override - public void open() throws CatalogException { - // Create the default database if it does not exist. - String defaultDatabase = getDefaultDatabase(); - if (!databaseExists(defaultDatabase)) { - try { - createDatabase(getDefaultDatabase(), ImmutableMap.of(), true); - } catch (DatabaseAlreadyExistException e) { - // Ignore the exception if it's already exist. - } - } - } + public void open() throws CatalogException {} @Override public void close() throws CatalogException { diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 16331f5ab1ea..825816fdf416 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -124,17 +124,7 @@ public FlinkCatalog( } @Override - public void open() throws CatalogException { - // Create the default database if it does not exist. - String defaultDatabase = getDefaultDatabase(); - if (!databaseExists(defaultDatabase)) { - try { - createDatabase(getDefaultDatabase(), ImmutableMap.of(), true); - } catch (DatabaseAlreadyExistException e) { - // Ignore the exception if it's already exist. - } - } - } + public void open() throws CatalogException {} @Override public void close() throws CatalogException { From 67fc16f9c00d600f3c88d8294a7fdefa582b06c8 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Mon, 26 Jun 2023 10:14:40 +0200 Subject: [PATCH 4/4] Update the tests --- .../flink/TestFlinkCatalogDatabase.java | 51 +++++++------------ .../flink/TestFlinkCatalogDatabase.java | 51 +++++++------------ .../flink/TestFlinkCatalogDatabase.java | 27 ++-------- 3 files changed, 38 insertions(+), 91 deletions(-) diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java index a19e5f72bbe9..47b47cb6262d 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java @@ -24,12 +24,11 @@ import java.util.Objects; import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; import org.apache.flink.types.Row; -import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.Schema; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; +import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.Assert; import org.junit.Assume; @@ -74,19 +73,6 @@ public void testCreateNamespace() { "Database should be created", validationNamespaceCatalog.namespaceExists(icebergNamespace)); } - @Test - public void testDefaultDatabase() { - sql("USE CATALOG %s", catalogName); - sql("SHOW TABLES"); - - Assert.assertEquals( - "Should use the current catalog", getTableEnv().getCurrentCatalog(), catalogName); - Assert.assertEquals( - "Should use the configured default namespace", - getTableEnv().getCurrentDatabase(), - "default"); - } - @Test public void testDropEmptyDatabase() { Assert.assertFalse( @@ -126,11 +112,11 @@ public void testDropNonEmptyNamespace() { "Table should exist", validationCatalog.tableExists(TableIdentifier.of(icebergNamespace, "tl"))); - AssertHelpers.assertThrowsCause( - "Should fail if trying to delete a non-empty database", - DatabaseNotEmptyException.class, - String.format("Database %s in catalog %s is not empty.", DATABASE, catalogName), - () -> sql("DROP DATABASE %s", flinkDatabase)); + Assertions.assertThatThrownBy(() -> sql("DROP DATABASE %s", flinkDatabase)) + .cause() + .isInstanceOf(DatabaseNotEmptyException.class) + .hasMessage( + String.format("Database %s in catalog %s is not empty.", DATABASE, catalogName)); sql("DROP TABLE %s.tl", flinkDatabase); } @@ -174,22 +160,17 @@ public void testListNamespace() { List databases = sql("SHOW DATABASES"); if (isHadoopCatalog) { - Assert.assertEquals("Should have 2 database", 2, databases.size()); - Assert.assertEquals( - "Should have db and default database", - Sets.newHashSet("default", "db"), - Sets.newHashSet(databases.get(0).getField(0), databases.get(1).getField(0))); + Assert.assertEquals("Should have 1 database", 1, databases.size()); + Assert.assertEquals("Should have db database", "db", databases.get(0).getField(0)); if (!baseNamespace.isEmpty()) { // test namespace not belongs to this catalog validationNamespaceCatalog.createNamespace( Namespace.of(baseNamespace.level(0), "UNKNOWN_NAMESPACE")); databases = sql("SHOW DATABASES"); - Assert.assertEquals("Should have 2 database", 2, databases.size()); + Assert.assertEquals("Should have 1 database", 1, databases.size()); Assert.assertEquals( - "Should have db and default database", - Sets.newHashSet("default", "db"), - Sets.newHashSet(databases.get(0).getField(0), databases.get(1).getField(0))); + "Should have db and default database", "db", databases.get(0).getField(0)); } } else { // If there are multiple classes extends FlinkTestBase, TestHiveMetastore may loose the @@ -301,10 +282,12 @@ public void testHadoopNotSupportMeta() { "Namespace should not already exist", validationNamespaceCatalog.namespaceExists(icebergNamespace)); - AssertHelpers.assertThrowsCause( - "Should fail if trying to create database with location in hadoop catalog.", - UnsupportedOperationException.class, - String.format("Cannot create namespace %s: metadata is not supported", icebergNamespace), - () -> sql("CREATE DATABASE %s WITH ('prop'='value')", flinkDatabase)); + Assertions.assertThatThrownBy( + () -> sql("CREATE DATABASE %s WITH ('prop'='value')", flinkDatabase)) + .cause() + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage( + String.format( + "Cannot create namespace %s: metadata is not supported", icebergNamespace)); } } diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java index a19e5f72bbe9..47b47cb6262d 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java @@ -24,12 +24,11 @@ import java.util.Objects; import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; import org.apache.flink.types.Row; -import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.Schema; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; +import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.Assert; import org.junit.Assume; @@ -74,19 +73,6 @@ public void testCreateNamespace() { "Database should be created", validationNamespaceCatalog.namespaceExists(icebergNamespace)); } - @Test - public void testDefaultDatabase() { - sql("USE CATALOG %s", catalogName); - sql("SHOW TABLES"); - - Assert.assertEquals( - "Should use the current catalog", getTableEnv().getCurrentCatalog(), catalogName); - Assert.assertEquals( - "Should use the configured default namespace", - getTableEnv().getCurrentDatabase(), - "default"); - } - @Test public void testDropEmptyDatabase() { Assert.assertFalse( @@ -126,11 +112,11 @@ public void testDropNonEmptyNamespace() { "Table should exist", validationCatalog.tableExists(TableIdentifier.of(icebergNamespace, "tl"))); - AssertHelpers.assertThrowsCause( - "Should fail if trying to delete a non-empty database", - DatabaseNotEmptyException.class, - String.format("Database %s in catalog %s is not empty.", DATABASE, catalogName), - () -> sql("DROP DATABASE %s", flinkDatabase)); + Assertions.assertThatThrownBy(() -> sql("DROP DATABASE %s", flinkDatabase)) + .cause() + .isInstanceOf(DatabaseNotEmptyException.class) + .hasMessage( + String.format("Database %s in catalog %s is not empty.", DATABASE, catalogName)); sql("DROP TABLE %s.tl", flinkDatabase); } @@ -174,22 +160,17 @@ public void testListNamespace() { List databases = sql("SHOW DATABASES"); if (isHadoopCatalog) { - Assert.assertEquals("Should have 2 database", 2, databases.size()); - Assert.assertEquals( - "Should have db and default database", - Sets.newHashSet("default", "db"), - Sets.newHashSet(databases.get(0).getField(0), databases.get(1).getField(0))); + Assert.assertEquals("Should have 1 database", 1, databases.size()); + Assert.assertEquals("Should have db database", "db", databases.get(0).getField(0)); if (!baseNamespace.isEmpty()) { // test namespace not belongs to this catalog validationNamespaceCatalog.createNamespace( Namespace.of(baseNamespace.level(0), "UNKNOWN_NAMESPACE")); databases = sql("SHOW DATABASES"); - Assert.assertEquals("Should have 2 database", 2, databases.size()); + Assert.assertEquals("Should have 1 database", 1, databases.size()); Assert.assertEquals( - "Should have db and default database", - Sets.newHashSet("default", "db"), - Sets.newHashSet(databases.get(0).getField(0), databases.get(1).getField(0))); + "Should have db and default database", "db", databases.get(0).getField(0)); } } else { // If there are multiple classes extends FlinkTestBase, TestHiveMetastore may loose the @@ -301,10 +282,12 @@ public void testHadoopNotSupportMeta() { "Namespace should not already exist", validationNamespaceCatalog.namespaceExists(icebergNamespace)); - AssertHelpers.assertThrowsCause( - "Should fail if trying to create database with location in hadoop catalog.", - UnsupportedOperationException.class, - String.format("Cannot create namespace %s: metadata is not supported", icebergNamespace), - () -> sql("CREATE DATABASE %s WITH ('prop'='value')", flinkDatabase)); + Assertions.assertThatThrownBy( + () -> sql("CREATE DATABASE %s WITH ('prop'='value')", flinkDatabase)) + .cause() + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage( + String.format( + "Cannot create namespace %s: metadata is not supported", icebergNamespace)); } } diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java index f97ace69f8f9..47b47cb6262d 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java @@ -27,7 +27,6 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; import org.assertj.core.api.Assertions; import org.junit.After; @@ -74,19 +73,6 @@ public void testCreateNamespace() { "Database should be created", validationNamespaceCatalog.namespaceExists(icebergNamespace)); } - @Test - public void testDefaultDatabase() { - sql("USE CATALOG %s", catalogName); - sql("SHOW TABLES"); - - Assert.assertEquals( - "Should use the current catalog", getTableEnv().getCurrentCatalog(), catalogName); - Assert.assertEquals( - "Should use the configured default namespace", - getTableEnv().getCurrentDatabase(), - "default"); - } - @Test public void testDropEmptyDatabase() { Assert.assertFalse( @@ -174,22 +160,17 @@ public void testListNamespace() { List databases = sql("SHOW DATABASES"); if (isHadoopCatalog) { - Assert.assertEquals("Should have 2 database", 2, databases.size()); - Assert.assertEquals( - "Should have db and default database", - Sets.newHashSet("default", "db"), - Sets.newHashSet(databases.get(0).getField(0), databases.get(1).getField(0))); + Assert.assertEquals("Should have 1 database", 1, databases.size()); + Assert.assertEquals("Should have db database", "db", databases.get(0).getField(0)); if (!baseNamespace.isEmpty()) { // test namespace not belongs to this catalog validationNamespaceCatalog.createNamespace( Namespace.of(baseNamespace.level(0), "UNKNOWN_NAMESPACE")); databases = sql("SHOW DATABASES"); - Assert.assertEquals("Should have 2 database", 2, databases.size()); + Assert.assertEquals("Should have 1 database", 1, databases.size()); Assert.assertEquals( - "Should have db and default database", - Sets.newHashSet("default", "db"), - Sets.newHashSet(databases.get(0).getField(0), databases.get(1).getField(0))); + "Should have db and default database", "db", databases.get(0).getField(0)); } } else { // If there are multiple classes extends FlinkTestBase, TestHiveMetastore may loose the