Skip to content

Commit

Permalink
Destination S3-V2: Resolve away null namespaces (#51049)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt authored Jan 10, 2025
1 parent 64f591f commit 70fd1cf
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,9 @@ class ObjectStoragePathFactory(
* * path: "{STREAM_NAME}/foo/" + "{part_number}{format_extension}" => "my_stream/foo/1.json"
* * path: "{STREAM_NAME}/foo" + "{part_number}{format_extension}" => "my_stream/foo1.json"
*/
private fun resolveRetainingTerminalSlash(prefix: String, path: String): String {
val asPath = Paths.get(prefix, path)
return if (path.endsWith('/')) {
private fun resolveRetainingTerminalSlash(prefix: String, suffix: String = ""): String {
val asPath = Paths.get(prefix, suffix)
return if ("$prefix$suffix".endsWith('/')) {
"$asPath/"
} else {
asPath.toString()
Expand All @@ -277,24 +277,28 @@ class ObjectStoragePathFactory(
stream: DestinationStream,
substituteStreamAndNamespaceOnly: Boolean
): String {
return getFormattedPath(
stream,
if (substituteStreamAndNamespaceOnly) PATH_VARIABLES_STREAM_CONSTANT
else PATH_VARIABLES,
isStaging = true
)
val path =
getFormattedPath(
stream,
if (substituteStreamAndNamespaceOnly) PATH_VARIABLES_STREAM_CONSTANT
else PATH_VARIABLES,
isStaging = true
)
return resolveRetainingTerminalSlash(path)
}

override fun getFinalDirectory(
stream: DestinationStream,
substituteStreamAndNamespaceOnly: Boolean
): String {
return getFormattedPath(
stream,
if (substituteStreamAndNamespaceOnly) PATH_VARIABLES_STREAM_CONSTANT
else PATH_VARIABLES,
isStaging = false
)
val path =
getFormattedPath(
stream,
if (substituteStreamAndNamespaceOnly) PATH_VARIABLES_STREAM_CONSTANT
else PATH_VARIABLES,
isStaging = false
)
return resolveRetainingTerminalSlash(path)
}

override fun getLongestStreamConstantPrefix(
Expand Down Expand Up @@ -401,7 +405,7 @@ class ObjectStoragePathFactory(
// NOTE the old code does not actually resolve the path + filename,
// even tho the documentation says it does.
val replacedForPathWithEmptyVariablesRemoved =
resolveRetainingTerminalSlash("", replacedForPath)
resolveRetainingTerminalSlash(replacedForPath)
val combined = "$replacedForPathWithEmptyVariablesRemoved$replacedForFile"
val withSuffix =
if (suffixPattern != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import io.airbyte.cdk.load.command.object_storage.ObjectStoragePathConfiguration
import io.airbyte.cdk.load.file.TimeProvider
import io.mockk.every
import io.mockk.impl.annotations.MockK
import io.mockk.mockk
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertNotNull
import org.junit.jupiter.api.Assertions.assertNull
Expand Down Expand Up @@ -85,4 +86,31 @@ class ObjectStoragePathFactoryUTest {
assertNotNull(matcher.match("prefix-test/stream/any_filename"))
assertNotNull(matcher.match("prefix-test/stream/any_filename-foo"))
}

@Test
fun `test pattern from null namespace`() {
every { pathConfigProvider.objectStoragePathConfiguration } returns
ObjectStoragePathConfiguration(
"prefix",
"staging",
"\${NAMESPACE}/\${STREAM_NAME}/",
"any_filename",
true,
)
val streamWithNullNamespace = mockk<DestinationStream>()
every { streamWithNullNamespace.descriptor } returns
DestinationStream.Descriptor(null, "stream")
val factory = ObjectStoragePathFactory(pathConfigProvider, null, null, timeProvider)
assertEquals(
"prefix/stream/any_filename",
factory.getPathToFile(streamWithNullNamespace, 0L, isStaging = false)
)
assertEquals(
"staging/stream/any_filename",
factory.getPathToFile(streamWithNullNamespace, 0L, isStaging = true)
)

val matcher = factory.getPathMatcher(streamWithNullNamespace, "(-foo)?")
assertNotNull(matcher.match("prefix/stream/any_filename"))
}
}

0 comments on commit 70fd1cf

Please sign in to comment.