diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java index 67d0e850e62e..47dcddcb9925 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java @@ -18,14 +18,18 @@ */ package org.apache.iceberg.connect.data; +import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicReference; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.connect.IcebergSinkConfig; import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.types.Type; @@ -83,6 +87,8 @@ Table autoCreateTable(String tableName, SinkRecord sample) { org.apache.iceberg.Schema schema = new org.apache.iceberg.Schema(structType.fields()); TableIdentifier identifier = TableIdentifier.parse(tableName); + createNamespaceIfNotExist(catalog, identifier.namespace()); + List partitionBy = config.tableConfig(tableName).partitionBy(); PartitionSpec spec; try { @@ -112,4 +118,22 @@ Table autoCreateTable(String tableName, SinkRecord sample) { }); return result.get(); } + + @VisibleForTesting + static void createNamespaceIfNotExist(Catalog catalog, Namespace identifierNamespace) { + if (!(catalog instanceof SupportsNamespaces)) { + return; + } + + String[] levels = identifierNamespace.levels(); + for (int index = 0; index < levels.length; index++) { + Namespace namespace = Namespace.of(Arrays.copyOfRange(levels, 0, index + 1)); + try { + ((SupportsNamespaces) catalog).createNamespace(namespace); + } catch (AlreadyExistsException | ForbiddenException ex) { + // Ignoring the error as forcefully creating the namespace even if it exists + // to avoid double namespaceExists() check. + } + } + } } diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/IcebergWriterFactoryTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/IcebergWriterFactoryTest.java index 93d1d2fa6bea..ab8bbdd02c23 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/IcebergWriterFactoryTest.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/IcebergWriterFactoryTest.java @@ -21,13 +21,18 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; +import java.util.List; import java.util.Map; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.connect.IcebergSinkConfig; import org.apache.iceberg.connect.TableSinkConfig; @@ -47,7 +52,7 @@ public class IcebergWriterFactoryTest { @ValueSource(booleans = {true, false}) @SuppressWarnings("unchecked") public void testAutoCreateTable(boolean partitioned) { - Catalog catalog = mock(Catalog.class); + Catalog catalog = mock(Catalog.class, withSettings().extraInterfaces(SupportsNamespaces.class)); when(catalog.loadTable(any())).thenThrow(new NoSuchTableException("no such table")); TableSinkConfig tableConfig = mock(TableSinkConfig.class); @@ -63,7 +68,7 @@ public void testAutoCreateTable(boolean partitioned) { when(record.value()).thenReturn(ImmutableMap.of("id", 123, "data", "foo2")); IcebergWriterFactory factory = new IcebergWriterFactory(catalog, config); - factory.autoCreateTable("db.tbl", record); + factory.autoCreateTable("foo1.foo2.foo3.bar", record); ArgumentCaptor identCaptor = ArgumentCaptor.forClass(TableIdentifier.class); ArgumentCaptor schemaCaptor = ArgumentCaptor.forClass(Schema.class); @@ -77,10 +82,18 @@ public void testAutoCreateTable(boolean partitioned) { specCaptor.capture(), propsCaptor.capture()); - assertThat(identCaptor.getValue()).isEqualTo(TableIdentifier.of("db", "tbl")); + assertThat(identCaptor.getValue()) + .isEqualTo(TableIdentifier.of(Namespace.of("foo1", "foo2", "foo3"), "bar")); assertThat(schemaCaptor.getValue().findField("id").type()).isEqualTo(LongType.get()); assertThat(schemaCaptor.getValue().findField("data").type()).isEqualTo(StringType.get()); assertThat(specCaptor.getValue().isPartitioned()).isEqualTo(partitioned); assertThat(propsCaptor.getValue()).containsKey("test-prop"); + + ArgumentCaptor namespaceCaptor = ArgumentCaptor.forClass(Namespace.class); + verify((SupportsNamespaces) catalog, times(3)).createNamespace(namespaceCaptor.capture()); + List capturedArguments = namespaceCaptor.getAllValues(); + assertThat(capturedArguments.get(0)).isEqualTo(Namespace.of("foo1")); + assertThat(capturedArguments.get(1)).isEqualTo(Namespace.of("foo1", "foo2")); + assertThat(capturedArguments.get(2)).isEqualTo(Namespace.of("foo1", "foo2", "foo3")); } }