diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 1c22cf7b71e6..16331f5ab1ea 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -67,7 +67,6 @@ import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; -import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; @@ -127,13 +126,13 @@ public FlinkCatalog( @Override public void open() throws CatalogException { // Create the default database if it does not exist. - try { - createDatabase(getDefaultDatabase(), ImmutableMap.of(), true); - } catch (DatabaseAlreadyExistException e) { - // Ignore the exception if it's already exist. - } catch (ForbiddenException e) { - // Ignore if the user doesn't have CREATE DATABASE permissions, - // and we assume that the database already exists + String defaultDatabase = getDefaultDatabase(); + if (!databaseExists(defaultDatabase)) { + try { + createDatabase(getDefaultDatabase(), ImmutableMap.of(), true); + } catch (DatabaseAlreadyExistException e) { + // Ignore the exception if it's already exist. + } } } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 1c22cf7b71e6..16331f5ab1ea 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -67,7 +67,6 @@ import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; -import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; @@ -127,13 +126,13 @@ public FlinkCatalog( @Override public void open() throws CatalogException { // Create the default database if it does not exist. - try { - createDatabase(getDefaultDatabase(), ImmutableMap.of(), true); - } catch (DatabaseAlreadyExistException e) { - // Ignore the exception if it's already exist. - } catch (ForbiddenException e) { - // Ignore if the user doesn't have CREATE DATABASE permissions, - // and we assume that the database already exists + String defaultDatabase = getDefaultDatabase(); + if (!databaseExists(defaultDatabase)) { + try { + createDatabase(getDefaultDatabase(), ImmutableMap.of(), true); + } catch (DatabaseAlreadyExistException e) { + // Ignore the exception if it's already exist. + } } } diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 1c22cf7b71e6..16331f5ab1ea 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -67,7 +67,6 @@ import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; -import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; @@ -127,13 +126,13 @@ public FlinkCatalog( @Override public void open() throws CatalogException { // Create the default database if it does not exist. - try { - createDatabase(getDefaultDatabase(), ImmutableMap.of(), true); - } catch (DatabaseAlreadyExistException e) { - // Ignore the exception if it's already exist. - } catch (ForbiddenException e) { - // Ignore if the user doesn't have CREATE DATABASE permissions, - // and we assume that the database already exists + String defaultDatabase = getDefaultDatabase(); + if (!databaseExists(defaultDatabase)) { + try { + createDatabase(getDefaultDatabase(), ImmutableMap.of(), true); + } catch (DatabaseAlreadyExistException e) { + // Ignore the exception if it's already exist. + } } }