Skip to content

Commit

Permalink
Destination S3-V2: Bug: Honor path variables in bucket prefix (#51039)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt authored Jan 10, 2025
1 parent 78c4375 commit 05b6d0b
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ interface PathFactory {
fun getPathMatcher(stream: DestinationStream, suffixPattern: String? = null): PathMatcher

val supportsStaging: Boolean
val prefix: String
val finalPrefix: String
}

data class PathMatcher(val regex: Regex, val variableToIndex: Map<String, Int>) {
Expand All @@ -63,23 +63,14 @@ class ObjectStoragePathFactory(
compressionConfigProvider: ObjectStorageCompressionConfigurationProvider<*>? = null,
private val timeProvider: TimeProvider,
) : PathFactory {
// Resolved configuration
private val pathConfig = pathConfigProvider.objectStoragePathConfiguration
override val supportsStaging: Boolean = pathConfig.usesStagingDirectory

// Resolved bucket path prefixes
private val stagingPrefixResolved =
pathConfig.stagingPrefix
?: Paths.get(pathConfig.prefix, DEFAULT_STAGING_PREFIX_SUFFIX).toString()
private val pathPatternResolved = pathConfig.pathSuffixPattern ?: DEFAULT_PATH_FORMAT
private val filePatternResolved = pathConfig.fileNamePattern ?: DEFAULT_FILE_FORMAT
private val fileFormatExtension =
formatConfigProvider?.objectStorageFormatConfiguration?.extension
private val compressionExtension =
compressionConfigProvider?.objectStorageCompressionConfiguration?.compressor?.extension
private val defaultExtension =
if (fileFormatExtension != null && compressionExtension != null) {
"$fileFormatExtension.$compressionExtension"
} else {
fileFormatExtension ?: compressionExtension
}

private val stagingPrefix: String
get() =
if (!pathConfig.usesStagingDirectory) {
Expand All @@ -89,15 +80,29 @@ class ObjectStoragePathFactory(
} else {
stagingPrefixResolved
}

override val supportsStaging: Boolean = pathConfig.usesStagingDirectory
override val prefix: String =
override val finalPrefix: String =
if (pathConfig.prefix.endsWith('/')) {
pathConfig.prefix.take(pathConfig.prefix.length - 1)
} else {
pathConfig.prefix
}

// Resolved path and filename patterns
private val pathPatternResolved = pathConfig.pathSuffixPattern ?: DEFAULT_PATH_FORMAT
private val filePatternResolved = pathConfig.fileNamePattern ?: DEFAULT_FILE_FORMAT

// Resolved file extensions
private val fileFormatExtension =
formatConfigProvider?.objectStorageFormatConfiguration?.extension
private val compressionExtension =
compressionConfigProvider?.objectStorageCompressionConfiguration?.compressor?.extension
private val defaultExtension =
if (fileFormatExtension != null && compressionExtension != null) {
"$fileFormatExtension.$compressionExtension"
} else {
fileFormatExtension ?: compressionExtension
}

/**
* Variable substitution is complex.
*
Expand Down Expand Up @@ -252,10 +257,17 @@ class ObjectStoragePathFactory(
}
}

/**
* This is to maintain parity with legacy code. Whether the path pattern ends with "/" is
* significant.
*
* * path: "{STREAM_NAME}/foo/" + "{part_number}{format_extension}" => "my_stream/foo/1.json"
* * path: "{STREAM_NAME}/foo" + "{part_number}{format_extension}" => "my_stream/foo1.json"
*/
private fun resolveRetainingTerminalSlash(prefix: String, path: String): String {
val asPath = Paths.get(prefix, path)
return if (path.endsWith('/')) {
asPath.toString() + "/"
"$asPath/"
} else {
asPath.toString()
}
Expand All @@ -265,26 +277,24 @@ class ObjectStoragePathFactory(
stream: DestinationStream,
substituteStreamAndNamespaceOnly: Boolean
): String {
val path =
getFormattedPath(
stream,
if (substituteStreamAndNamespaceOnly) PATH_VARIABLES_STREAM_CONSTANT
else PATH_VARIABLES
)
return resolveRetainingTerminalSlash(stagingPrefix, path)
return getFormattedPath(
stream,
if (substituteStreamAndNamespaceOnly) PATH_VARIABLES_STREAM_CONSTANT
else PATH_VARIABLES,
isStaging = true
)
}

override fun getFinalDirectory(
stream: DestinationStream,
substituteStreamAndNamespaceOnly: Boolean
): String {
val path =
getFormattedPath(
stream,
if (substituteStreamAndNamespaceOnly) PATH_VARIABLES_STREAM_CONSTANT
else PATH_VARIABLES
)
return resolveRetainingTerminalSlash(prefix, path)
return getFormattedPath(
stream,
if (substituteStreamAndNamespaceOnly) PATH_VARIABLES_STREAM_CONSTANT
else PATH_VARIABLES,
isStaging = false
)
}

override fun getLongestStreamConstantPrefix(
Expand Down Expand Up @@ -323,9 +333,11 @@ class ObjectStoragePathFactory(

private fun getFormattedPath(
stream: DestinationStream,
variables: List<PathVariable> = PATH_VARIABLES
variables: List<PathVariable> = PATH_VARIABLES,
isStaging: Boolean
): String {
val pattern = pathPatternResolved
val selectedPrefix = if (isStaging) stagingPrefix else finalPrefix
val pattern = resolveRetainingTerminalSlash(selectedPrefix, pathPatternResolved)
val context = VariableContext(stream)
return variables.fold(pattern) { acc, variable -> variable.maybeApply(acc, context) }
}
Expand Down Expand Up @@ -375,13 +387,10 @@ class ObjectStoragePathFactory(
val pathVariableToPattern = getPathVariableToPattern(stream)
val variableToIndex = mutableMapOf<String, Int>()

val pathPattern = resolveRetainingTerminalSlash(finalPrefix, pathPatternResolved)

val replacedForPath =
buildPattern(
pathPatternResolved,
"""\\\$\{(\w+)}""",
pathVariableToPattern,
variableToIndex
)
buildPattern(pathPattern, """\\\$\{(\w+)}""", pathVariableToPattern, variableToIndex)
val replacedForFile =
buildPattern(
filePatternResolved,
Expand All @@ -391,12 +400,9 @@ class ObjectStoragePathFactory(
)
// NOTE the old code does not actually resolve the path + filename,
// even tho the documentation says it does.
val combined =
if (replacedForPath.startsWith('/')) {
"${prefix}$replacedForPath$replacedForFile"
} else {
"$prefix/$replacedForPath$replacedForFile"
}
val replacedForPathWithEmptyVariablesRemoved =
resolveRetainingTerminalSlash("", replacedForPath)
val combined = "$replacedForPathWithEmptyVariablesRemoved$replacedForFile"
val withSuffix =
if (suffixPattern != null) {
variableToIndex["suffix"] = variableToIndex.size + 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,41 @@ class ObjectStoragePathFactoryUTest {
assertNotNull(match2)
assertEquals(match2?.customSuffix, "-1")
}

@Test
fun `test file pattern with variable in prefix`() {
every { pathConfigProvider.objectStoragePathConfiguration } returns
ObjectStoragePathConfiguration(
"prefix-\${NAMESPACE}",
"staging-\${NAMESPACE}",
"\${STREAM_NAME}/",
"any_filename",
true,
)
val factory = ObjectStoragePathFactory(pathConfigProvider, null, null, timeProvider)
assertEquals(
"prefix-test/stream/any_filename",
factory.getPathToFile(stream, 0L, isStaging = false)
)
assertEquals(
"staging-test/stream/any_filename",
factory.getPathToFile(stream, 0L, isStaging = true)
)
}

@Test
fun `test pattern matcher with variable in prefix`() {
every { pathConfigProvider.objectStoragePathConfiguration } returns
ObjectStoragePathConfiguration(
"prefix-\${NAMESPACE}",
"staging-\${NAMESPACE}",
"\${STREAM_NAME}/",
"any_filename",
true,
)
val factory = ObjectStoragePathFactory(pathConfigProvider, null, null, timeProvider)
val matcher = factory.getPathMatcher(stream, "(-foo)?")
assertNotNull(matcher.match("prefix-test/stream/any_filename"))
assertNotNull(matcher.match("prefix-test/stream/any_filename-foo"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ class ObjectStorageDestinationStateTest {
): List<Triple<Int, String, Long>> {
val genIdKey = ObjectStorageDestinationState.METADATA_GENERATION_ID_KEY
val prefix =
"${d.pathFactory.prefix}/${stream.descriptor.namespace}/${stream.descriptor.name}"
"${d.pathFactory.finalPrefix}/${stream.descriptor.namespace}/${stream.descriptor.name}"
val generations =
listOf(
Triple(0, "$prefix/key1-0", 0L),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ open class MockPathFactory : PathFactory {

override val supportsStaging: Boolean
get() = doSupportStaging
override val prefix: String
override val finalPrefix: String
get() = "prefix"

private fun fromStream(stream: DestinationStream): String {
Expand All @@ -28,14 +28,14 @@ open class MockPathFactory : PathFactory {
stream: DestinationStream,
substituteStreamAndNamespaceOnly: Boolean
): String {
return "$prefix/staging/${fromStream(stream)}"
return "$finalPrefix/staging/${fromStream(stream)}"
}

override fun getFinalDirectory(
stream: DestinationStream,
substituteStreamAndNamespaceOnly: Boolean
): String {
return "$prefix/${fromStream(stream)}"
return "$finalPrefix/${fromStream(stream)}"
}

override fun getPathToFile(
Expand Down Expand Up @@ -66,7 +66,7 @@ open class MockPathFactory : PathFactory {
return PathMatcher(
regex =
Regex(
"$prefix/(${stream.descriptor.namespace})/(${stream.descriptor.name})/(.*)-(.*)$"
"$finalPrefix/(${stream.descriptor.namespace})/(${stream.descriptor.name})/(.*)-(.*)$"
),
variableToIndex = mapOf("part_number" to 4)
)
Expand Down

0 comments on commit 05b6d0b

Please sign in to comment.