diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactory.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactory.kt index abda38adf5e8..33b9e7f3896c 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactory.kt @@ -264,9 +264,9 @@ class ObjectStoragePathFactory( * * path: "{STREAM_NAME}/foo/" + "{part_number}{format_extension}" => "my_stream/foo/1.json" * * path: "{STREAM_NAME}/foo" + "{part_number}{format_extension}" => "my_stream/foo1.json" */ - private fun resolveRetainingTerminalSlash(prefix: String, path: String): String { - val asPath = Paths.get(prefix, path) - return if (path.endsWith('/')) { + private fun resolveRetainingTerminalSlash(prefix: String, suffix: String = ""): String { + val asPath = Paths.get(prefix, suffix) + return if ("$prefix$suffix".endsWith('/')) { "$asPath/" } else { asPath.toString() @@ -277,24 +277,28 @@ class ObjectStoragePathFactory( stream: DestinationStream, substituteStreamAndNamespaceOnly: Boolean ): String { - return getFormattedPath( - stream, - if (substituteStreamAndNamespaceOnly) PATH_VARIABLES_STREAM_CONSTANT - else PATH_VARIABLES, - isStaging = true - ) + val path = + getFormattedPath( + stream, + if (substituteStreamAndNamespaceOnly) PATH_VARIABLES_STREAM_CONSTANT + else PATH_VARIABLES, + isStaging = true + ) + return resolveRetainingTerminalSlash(path) } override fun getFinalDirectory( stream: DestinationStream, substituteStreamAndNamespaceOnly: Boolean ): String { - return getFormattedPath( - stream, - if (substituteStreamAndNamespaceOnly) PATH_VARIABLES_STREAM_CONSTANT - else PATH_VARIABLES, - isStaging = false - ) + val path = + getFormattedPath( + stream, + if (substituteStreamAndNamespaceOnly) PATH_VARIABLES_STREAM_CONSTANT + else PATH_VARIABLES, + isStaging = false + ) + return resolveRetainingTerminalSlash(path) } override fun getLongestStreamConstantPrefix( @@ -401,7 +405,7 @@ class ObjectStoragePathFactory( // NOTE the old code does not actually resolve the path + filename, // even tho the documentation says it does. val replacedForPathWithEmptyVariablesRemoved = - resolveRetainingTerminalSlash("", replacedForPath) + resolveRetainingTerminalSlash(replacedForPath) val combined = "$replacedForPathWithEmptyVariablesRemoved$replacedForFile" val withSuffix = if (suffixPattern != null) { diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactoryUTest.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactoryUTest.kt index 1666d0729740..c6f804fc2a8c 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactoryUTest.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactoryUTest.kt @@ -10,6 +10,7 @@ import io.airbyte.cdk.load.command.object_storage.ObjectStoragePathConfiguration import io.airbyte.cdk.load.file.TimeProvider import io.mockk.every import io.mockk.impl.annotations.MockK +import io.mockk.mockk import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertNotNull import org.junit.jupiter.api.Assertions.assertNull @@ -85,4 +86,31 @@ class ObjectStoragePathFactoryUTest { assertNotNull(matcher.match("prefix-test/stream/any_filename")) assertNotNull(matcher.match("prefix-test/stream/any_filename-foo")) } + + @Test + fun `test pattern from null namespace`() { + every { pathConfigProvider.objectStoragePathConfiguration } returns + ObjectStoragePathConfiguration( + "prefix", + "staging", + "\${NAMESPACE}/\${STREAM_NAME}/", + "any_filename", + true, + ) + val streamWithNullNamespace = mockk() + every { streamWithNullNamespace.descriptor } returns + DestinationStream.Descriptor(null, "stream") + val factory = ObjectStoragePathFactory(pathConfigProvider, null, null, timeProvider) + assertEquals( + "prefix/stream/any_filename", + factory.getPathToFile(streamWithNullNamespace, 0L, isStaging = false) + ) + assertEquals( + "staging/stream/any_filename", + factory.getPathToFile(streamWithNullNamespace, 0L, isStaging = true) + ) + + val matcher = factory.getPathMatcher(streamWithNullNamespace, "(-foo)?") + assertNotNull(matcher.match("prefix/stream/any_filename")) + } }