From 05b6d0bf51bbb53339d89adb4ed503022a53193c Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Fri, 10 Jan 2025 11:44:05 -0800 Subject: [PATCH] Destination S3-V2: Bug: Honor path variables in bucket prefix (#51039) --- .../ObjectStoragePathFactory.kt | 98 ++++++++++--------- .../ObjectStoragePathFactoryUTest.kt | 37 +++++++ .../ObjectStorageDestinationStateTest.kt | 2 +- .../io/airbyte/cdk/load/MockPathFactory.kt | 8 +- 4 files changed, 94 insertions(+), 51 deletions(-) diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactory.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactory.kt index 36b1d9778a26..abda38adf5e8 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactory.kt @@ -39,7 +39,7 @@ interface PathFactory { fun getPathMatcher(stream: DestinationStream, suffixPattern: String? = null): PathMatcher val supportsStaging: Boolean - val prefix: String + val finalPrefix: String } data class PathMatcher(val regex: Regex, val variableToIndex: Map) { @@ -63,23 +63,14 @@ class ObjectStoragePathFactory( compressionConfigProvider: ObjectStorageCompressionConfigurationProvider<*>? = null, private val timeProvider: TimeProvider, ) : PathFactory { + // Resolved configuration private val pathConfig = pathConfigProvider.objectStoragePathConfiguration + override val supportsStaging: Boolean = pathConfig.usesStagingDirectory + + // Resolved bucket path prefixes private val stagingPrefixResolved = pathConfig.stagingPrefix ?: Paths.get(pathConfig.prefix, DEFAULT_STAGING_PREFIX_SUFFIX).toString() - private val pathPatternResolved = pathConfig.pathSuffixPattern ?: DEFAULT_PATH_FORMAT - private val filePatternResolved = pathConfig.fileNamePattern ?: DEFAULT_FILE_FORMAT - private val fileFormatExtension = - formatConfigProvider?.objectStorageFormatConfiguration?.extension - private val compressionExtension = - compressionConfigProvider?.objectStorageCompressionConfiguration?.compressor?.extension - private val defaultExtension = - if (fileFormatExtension != null && compressionExtension != null) { - "$fileFormatExtension.$compressionExtension" - } else { - fileFormatExtension ?: compressionExtension - } - private val stagingPrefix: String get() = if (!pathConfig.usesStagingDirectory) { @@ -89,15 +80,29 @@ class ObjectStoragePathFactory( } else { stagingPrefixResolved } - - override val supportsStaging: Boolean = pathConfig.usesStagingDirectory - override val prefix: String = + override val finalPrefix: String = if (pathConfig.prefix.endsWith('/')) { pathConfig.prefix.take(pathConfig.prefix.length - 1) } else { pathConfig.prefix } + // Resolved path and filename patterns + private val pathPatternResolved = pathConfig.pathSuffixPattern ?: DEFAULT_PATH_FORMAT + private val filePatternResolved = pathConfig.fileNamePattern ?: DEFAULT_FILE_FORMAT + + // Resolved file extensions + private val fileFormatExtension = + formatConfigProvider?.objectStorageFormatConfiguration?.extension + private val compressionExtension = + compressionConfigProvider?.objectStorageCompressionConfiguration?.compressor?.extension + private val defaultExtension = + if (fileFormatExtension != null && compressionExtension != null) { + "$fileFormatExtension.$compressionExtension" + } else { + fileFormatExtension ?: compressionExtension + } + /** * Variable substitution is complex. * @@ -252,10 +257,17 @@ class ObjectStoragePathFactory( } } + /** + * This is to maintain parity with legacy code. Whether the path pattern ends with "/" is + * significant. + * + * * path: "{STREAM_NAME}/foo/" + "{part_number}{format_extension}" => "my_stream/foo/1.json" + * * path: "{STREAM_NAME}/foo" + "{part_number}{format_extension}" => "my_stream/foo1.json" + */ private fun resolveRetainingTerminalSlash(prefix: String, path: String): String { val asPath = Paths.get(prefix, path) return if (path.endsWith('/')) { - asPath.toString() + "/" + "$asPath/" } else { asPath.toString() } @@ -265,26 +277,24 @@ class ObjectStoragePathFactory( stream: DestinationStream, substituteStreamAndNamespaceOnly: Boolean ): String { - val path = - getFormattedPath( - stream, - if (substituteStreamAndNamespaceOnly) PATH_VARIABLES_STREAM_CONSTANT - else PATH_VARIABLES - ) - return resolveRetainingTerminalSlash(stagingPrefix, path) + return getFormattedPath( + stream, + if (substituteStreamAndNamespaceOnly) PATH_VARIABLES_STREAM_CONSTANT + else PATH_VARIABLES, + isStaging = true + ) } override fun getFinalDirectory( stream: DestinationStream, substituteStreamAndNamespaceOnly: Boolean ): String { - val path = - getFormattedPath( - stream, - if (substituteStreamAndNamespaceOnly) PATH_VARIABLES_STREAM_CONSTANT - else PATH_VARIABLES - ) - return resolveRetainingTerminalSlash(prefix, path) + return getFormattedPath( + stream, + if (substituteStreamAndNamespaceOnly) PATH_VARIABLES_STREAM_CONSTANT + else PATH_VARIABLES, + isStaging = false + ) } override fun getLongestStreamConstantPrefix( @@ -323,9 +333,11 @@ class ObjectStoragePathFactory( private fun getFormattedPath( stream: DestinationStream, - variables: List = PATH_VARIABLES + variables: List = PATH_VARIABLES, + isStaging: Boolean ): String { - val pattern = pathPatternResolved + val selectedPrefix = if (isStaging) stagingPrefix else finalPrefix + val pattern = resolveRetainingTerminalSlash(selectedPrefix, pathPatternResolved) val context = VariableContext(stream) return variables.fold(pattern) { acc, variable -> variable.maybeApply(acc, context) } } @@ -375,13 +387,10 @@ class ObjectStoragePathFactory( val pathVariableToPattern = getPathVariableToPattern(stream) val variableToIndex = mutableMapOf() + val pathPattern = resolveRetainingTerminalSlash(finalPrefix, pathPatternResolved) + val replacedForPath = - buildPattern( - pathPatternResolved, - """\\\$\{(\w+)}""", - pathVariableToPattern, - variableToIndex - ) + buildPattern(pathPattern, """\\\$\{(\w+)}""", pathVariableToPattern, variableToIndex) val replacedForFile = buildPattern( filePatternResolved, @@ -391,12 +400,9 @@ class ObjectStoragePathFactory( ) // NOTE the old code does not actually resolve the path + filename, // even tho the documentation says it does. - val combined = - if (replacedForPath.startsWith('/')) { - "${prefix}$replacedForPath$replacedForFile" - } else { - "$prefix/$replacedForPath$replacedForFile" - } + val replacedForPathWithEmptyVariablesRemoved = + resolveRetainingTerminalSlash("", replacedForPath) + val combined = "$replacedForPathWithEmptyVariablesRemoved$replacedForFile" val withSuffix = if (suffixPattern != null) { variableToIndex["suffix"] = variableToIndex.size + 1 diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactoryUTest.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactoryUTest.kt index 9732954ddcd3..1666d0729740 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactoryUTest.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactoryUTest.kt @@ -48,4 +48,41 @@ class ObjectStoragePathFactoryUTest { assertNotNull(match2) assertEquals(match2?.customSuffix, "-1") } + + @Test + fun `test file pattern with variable in prefix`() { + every { pathConfigProvider.objectStoragePathConfiguration } returns + ObjectStoragePathConfiguration( + "prefix-\${NAMESPACE}", + "staging-\${NAMESPACE}", + "\${STREAM_NAME}/", + "any_filename", + true, + ) + val factory = ObjectStoragePathFactory(pathConfigProvider, null, null, timeProvider) + assertEquals( + "prefix-test/stream/any_filename", + factory.getPathToFile(stream, 0L, isStaging = false) + ) + assertEquals( + "staging-test/stream/any_filename", + factory.getPathToFile(stream, 0L, isStaging = true) + ) + } + + @Test + fun `test pattern matcher with variable in prefix`() { + every { pathConfigProvider.objectStoragePathConfiguration } returns + ObjectStoragePathConfiguration( + "prefix-\${NAMESPACE}", + "staging-\${NAMESPACE}", + "\${STREAM_NAME}/", + "any_filename", + true, + ) + val factory = ObjectStoragePathFactory(pathConfigProvider, null, null, timeProvider) + val matcher = factory.getPathMatcher(stream, "(-foo)?") + assertNotNull(matcher.match("prefix-test/stream/any_filename")) + assertNotNull(matcher.match("prefix-test/stream/any_filename-foo")) + } } diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateTest.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateTest.kt index de22e0674f3f..141b4ad393cb 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateTest.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateTest.kt @@ -210,7 +210,7 @@ class ObjectStorageDestinationStateTest { ): List> { val genIdKey = ObjectStorageDestinationState.METADATA_GENERATION_ID_KEY val prefix = - "${d.pathFactory.prefix}/${stream.descriptor.namespace}/${stream.descriptor.name}" + "${d.pathFactory.finalPrefix}/${stream.descriptor.namespace}/${stream.descriptor.name}" val generations = listOf( Triple(0, "$prefix/key1-0", 0L), diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/MockPathFactory.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/MockPathFactory.kt index c76239ebcec1..58bad45f0515 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/MockPathFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/MockPathFactory.kt @@ -17,7 +17,7 @@ open class MockPathFactory : PathFactory { override val supportsStaging: Boolean get() = doSupportStaging - override val prefix: String + override val finalPrefix: String get() = "prefix" private fun fromStream(stream: DestinationStream): String { @@ -28,14 +28,14 @@ open class MockPathFactory : PathFactory { stream: DestinationStream, substituteStreamAndNamespaceOnly: Boolean ): String { - return "$prefix/staging/${fromStream(stream)}" + return "$finalPrefix/staging/${fromStream(stream)}" } override fun getFinalDirectory( stream: DestinationStream, substituteStreamAndNamespaceOnly: Boolean ): String { - return "$prefix/${fromStream(stream)}" + return "$finalPrefix/${fromStream(stream)}" } override fun getPathToFile( @@ -66,7 +66,7 @@ open class MockPathFactory : PathFactory { return PathMatcher( regex = Regex( - "$prefix/(${stream.descriptor.namespace})/(${stream.descriptor.name})/(.*)-(.*)$" + "$finalPrefix/(${stream.descriptor.namespace})/(${stream.descriptor.name})/(.*)-(.*)$" ), variableToIndex = mapOf("part_number" to 4) )