Skip to content

Commit

Permalink
Destination Iceberg V2: support aws instance profile auth (#50876)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored Jan 10, 2025
1 parent 8b7ed0b commit 78c4375
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ import io.airbyte.cdk.output.ExceptionHandler
import io.airbyte.cdk.output.OutputConsumer
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Requires
import jakarta.inject.Singleton

private val logger = KotlinLogging.logger {}

@Singleton
@Requires(property = Operation.PROPERTY, value = "check")
@Requires(env = ["destination"])
Expand All @@ -40,6 +43,7 @@ class CheckOperation<T : ConfigurationSpecification, C : DestinationConfiguratio
)
outputConsumer.accept(successMessage)
} catch (t: Throwable) {
logger.warn(t) { "Caught throwable during CHECK" }
val (traceMessage, statusMessage) = exceptionHandler.handleCheckFailure(t)
outputConsumer.accept(traceMessage)
outputConsumer.accept(statusMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ data:
alias: airbyte-connector-testing-secret-store
connectorType: destination
definitionId: 716ca874-520b-4902-9f80-9fad66754b89
dockerImageTag: 0.2.11
dockerImageTag: 0.2.12
dockerRepository: airbyte/destination-s3-data-lake
documentationUrl: https://docs.airbyte.com/integrations/destinations/s3-data-lake
githubIssueLabel: destination-s3-data-lake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package io.airbyte.integrations.destination.s3_data_lake
import io.airbyte.cdk.load.file.s3.S3ClientFactory
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sts.StsClient
Expand Down Expand Up @@ -40,9 +41,13 @@ class GlueCredentialsProvider private constructor(private val delegate: AwsCrede
val provider =
when (mode) {
AWS_CREDENTIALS_MODE_STATIC_CREDS -> {
StaticCredentialsProvider.create(
AwsBasicCredentials.create(accessKey, secretKey)
)
if (accessKey != null && secretKey != null) {
StaticCredentialsProvider.create(
AwsBasicCredentials.create(accessKey, secretKey)
)
} else {
DefaultCredentialsProvider.create()
}
}
AWS_CREDENTIALS_MODE_ASSUME_ROLE -> {
StsAssumeRoleCredentialsProvider.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,27 +354,31 @@ class S3DataLakeUtil(
private fun buildKeyBasedClientProperties(
config: S3DataLakeConfiguration
): Map<String, String> {
val awsAccessKeyId =
requireNotNull(config.awsAccessKeyConfiguration.accessKeyId) {
"AWS Access Key ID is required for key-based authentication"
}
val awsSecretAccessKey =
requireNotNull(config.awsAccessKeyConfiguration.secretAccessKey) {
"AWS Secret Access Key is required for key-based authentication"
}
val clientCredentialsProviderPrefix = "${AwsClientProperties.CLIENT_CREDENTIALS_PROVIDER}."

return mapOf(
S3FileIOProperties.ACCESS_KEY_ID to awsAccessKeyId,
S3FileIOProperties.SECRET_ACCESS_KEY to awsSecretAccessKey,
AwsClientProperties.CLIENT_REGION to config.s3BucketConfiguration.s3BucketRegion.region,
AwsClientProperties.CLIENT_CREDENTIALS_PROVIDER to
GlueCredentialsProvider::class.java.name,
"${AwsClientProperties.CLIENT_CREDENTIALS_PROVIDER}.$AWS_CREDENTIALS_MODE" to
AWS_CREDENTIALS_MODE_STATIC_CREDS,
"${clientCredentialsProviderPrefix}${ACCESS_KEY_ID}" to awsAccessKeyId,
"${clientCredentialsProviderPrefix}${SECRET_ACCESS_KEY}" to awsSecretAccessKey
)
val properties =
mutableMapOf(
AwsClientProperties.CLIENT_REGION to
config.s3BucketConfiguration.s3BucketRegion.region,
AwsClientProperties.CLIENT_CREDENTIALS_PROVIDER to
GlueCredentialsProvider::class.java.name,
"${AwsClientProperties.CLIENT_CREDENTIALS_PROVIDER}.$AWS_CREDENTIALS_MODE" to
AWS_CREDENTIALS_MODE_STATIC_CREDS,
)

// If we don't have explicit S3 creds, fall back to the default creds provider chain.
// For example, this should allow us to use AWS instance profiles.
val awsAccessKeyId = config.awsAccessKeyConfiguration.accessKeyId
val awsSecretAccessKey = config.awsAccessKeyConfiguration.secretAccessKey
if (awsAccessKeyId != null && awsSecretAccessKey != null) {
properties[S3FileIOProperties.ACCESS_KEY_ID] = awsAccessKeyId
properties[S3FileIOProperties.SECRET_ACCESS_KEY] = awsSecretAccessKey
properties["${clientCredentialsProviderPrefix}${ACCESS_KEY_ID}"] = awsAccessKeyId
properties["${clientCredentialsProviderPrefix}${SECRET_ACCESS_KEY}"] =
awsSecretAccessKey
}

return properties
}

fun toIcebergSchema(stream: DestinationStream, pipeline: MapperPipeline): Schema {
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/s3-data-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ for more information.

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------------------|:---------------------------------------------|
| 0.2.12 | 2025-01-10 | [\#50876](https://github.com/airbytehq/airbyte/pull/50876) | Add support for AWS instance profile auth |
| 0.2.11 | 2025-01-10 | [\#50971](https://github.com/airbytehq/airbyte/pull/50971) | Internal refactor in AWS auth flow |
| 0.2.10 | 2025-01-09 | [\#50400](https://github.com/airbytehq/airbyte/pull/50400) | Add S3DataLakeTypesComparator |
| 0.2.9 | 2025-01-09 | [\#51022](https://github.com/airbytehq/airbyte/pull/51022) | Rename all classes and files from Iceberg V2 |
Expand Down

0 comments on commit 78c4375

Please sign in to comment.