From 4269e502600ebbfa98aa589b1102eb6eaa7ac69b Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Thu, 15 Jun 2023 22:41:28 +0200 Subject: [PATCH] Check for the database instead --- .../org/apache/iceberg/flink/FlinkCatalog.java | 15 +++++++-------- .../org/apache/iceberg/flink/FlinkCatalog.java | 15 +++++++-------- .../org/apache/iceberg/flink/FlinkCatalog.java | 15 +++++++-------- 3 files changed, 21 insertions(+), 24 deletions(-) diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 1c22cf7b71e6..16331f5ab1ea 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -67,7 +67,6 @@ import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; -import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; @@ -127,13 +126,13 @@ public FlinkCatalog( @Override public void open() throws CatalogException { // Create the default database if it does not exist. - try { - createDatabase(getDefaultDatabase(), ImmutableMap.of(), true); - } catch (DatabaseAlreadyExistException e) { - // Ignore the exception if it's already exist. - } catch (ForbiddenException e) { - // Ignore if the user doesn't have CREATE DATABASE permissions, - // and we assume that the database already exists + String defaultDatabase = getDefaultDatabase(); + if (!databaseExists(defaultDatabase)) { + try { + createDatabase(getDefaultDatabase(), ImmutableMap.of(), true); + } catch (DatabaseAlreadyExistException e) { + // Ignore the exception if it's already exist. + } } } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 1c22cf7b71e6..16331f5ab1ea 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -67,7 +67,6 @@ import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; -import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; @@ -127,13 +126,13 @@ public FlinkCatalog( @Override public void open() throws CatalogException { // Create the default database if it does not exist. - try { - createDatabase(getDefaultDatabase(), ImmutableMap.of(), true); - } catch (DatabaseAlreadyExistException e) { - // Ignore the exception if it's already exist. - } catch (ForbiddenException e) { - // Ignore if the user doesn't have CREATE DATABASE permissions, - // and we assume that the database already exists + String defaultDatabase = getDefaultDatabase(); + if (!databaseExists(defaultDatabase)) { + try { + createDatabase(getDefaultDatabase(), ImmutableMap.of(), true); + } catch (DatabaseAlreadyExistException e) { + // Ignore the exception if it's already exist. + } } } diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 1c22cf7b71e6..16331f5ab1ea 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -67,7 +67,6 @@ import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; -import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; @@ -127,13 +126,13 @@ public FlinkCatalog( @Override public void open() throws CatalogException { // Create the default database if it does not exist. - try { - createDatabase(getDefaultDatabase(), ImmutableMap.of(), true); - } catch (DatabaseAlreadyExistException e) { - // Ignore the exception if it's already exist. - } catch (ForbiddenException e) { - // Ignore if the user doesn't have CREATE DATABASE permissions, - // and we assume that the database already exists + String defaultDatabase = getDefaultDatabase(); + if (!databaseExists(defaultDatabase)) { + try { + createDatabase(getDefaultDatabase(), ImmutableMap.of(), true); + } catch (DatabaseAlreadyExistException e) { + // Ignore the exception if it's already exist. + } } }